Skip to content

Commit

Permalink
HADOOP-18920. RPC Metrics : Optimize logic for log slow RPCs (#6146)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiyang1987 authored Oct 25, 2023
1 parent a170d58 commit f85ac5b
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic {
"ipc.server.log.slow.rpc";
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;

public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY =
"ipc.server.log.slow.rpc.threshold.ms";
public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0;

public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
"ipc.server.purge.interval";
public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,22 @@ protected ResponseBuffer initialValue() {
private final long metricsUpdaterInterval;
private final ScheduledExecutorService scheduledExecutorService;

private boolean logSlowRPC = false;
private volatile boolean logSlowRPC = false;
/** Threshold time for log slow rpc. */
private volatile long logSlowRPCThresholdTime;

/**
* Checks if LogSlowRPC is set true.
* @return true, if LogSlowRPC is set true, false, otherwise.
*/
protected boolean isLogSlowRPC() {
public boolean isLogSlowRPC() {
return logSlowRPC;
}

public long getLogSlowRPCThresholdTime() {
return logSlowRPCThresholdTime;
}

public int getNumInProcessHandler() {
return numInProcessHandler.get();
}
Expand All @@ -543,10 +549,16 @@ public long getTotalRequestsPerSecond() {
* @param logSlowRPCFlag input logSlowRPCFlag.
*/
@VisibleForTesting
protected void setLogSlowRPC(boolean logSlowRPCFlag) {
public void setLogSlowRPC(boolean logSlowRPCFlag) {
this.logSlowRPC = logSlowRPCFlag;
}

@VisibleForTesting
public void setLogSlowRPCThresholdTime(long logSlowRPCThresholdMs) {
this.logSlowRPCThresholdTime = rpcMetrics.getMetricsTimeUnit().
convert(logSlowRPCThresholdMs, TimeUnit.MILLISECONDS);
}

private void setPurgeIntervalNanos(int purgeInterval) {
int tmpPurgeInterval = CommonConfigurationKeysPublic.
IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT;
Expand All @@ -568,12 +580,15 @@ public long getPurgeIntervalNanos() {
* @param methodName - RPC Request method name
* @param details - Processing Detail.
*
* if this request took too much time relative to other requests
* we consider that as a slow RPC. 3 is a magic number that comes
* from 3 sigma deviation. A very simple explanation can be found
* by searching for 68-95-99.7 rule. We flag an RPC as slow RPC
* if and only if it falls above 99.7% of requests. We start this logic
* only once we have enough sample size.
* If a request took significant more time than other requests,
* and its processing time is at least `logSlowRPCThresholdMs` we consider that as a slow RPC.
*
* The definition rules for calculating whether the current request took too much time
* compared to other requests are as follows:
* 3 is a magic number that comes from 3 sigma deviation.
* A very simple explanation can be found by searching for 68-95-99.7 rule.
* We flag an RPC as slow RPC if and only if it falls above 99.7% of requests.
* We start this logic only once we have enough sample size.
*/
void logSlowRpcCalls(String methodName, Call call,
ProcessingDetails details) {
Expand All @@ -587,15 +602,14 @@ void logSlowRpcCalls(String methodName, Call call,
final double threeSigma = rpcMetrics.getProcessingMean() +
(rpcMetrics.getProcessingStdDev() * deviation);

long processingTime =
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
final TimeUnit metricsTimeUnit = rpcMetrics.getMetricsTimeUnit();
long processingTime = details.get(Timing.PROCESSING, metricsTimeUnit);
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
details.toString());
(processingTime > threeSigma) &&
(processingTime > getLogSlowRPCThresholdTime())) {
LOG.warn("Slow RPC : {} took {} {} to process from client {}, the processing detail is {}," +
" and the threshold time is {} {}.", methodName, processingTime, metricsTimeUnit,
call, details.toString(), getLogSlowRPCThresholdTime(), metricsTimeUnit);
rpcMetrics.incrSlowRpc();
}
}
Expand Down Expand Up @@ -3359,6 +3373,10 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));

this.setLogSlowRPCThresholdTime(conf.getLong(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT));

this.setPurgeIntervalNanos(conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,15 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.log.slow.rpc.threshold.ms</name>
<value>0</value>
<description>The threshold in milliseconds for logging slow rpc when ipc.server.log.slow.rpc is enabled.
Besides of being much slower than other RPC requests, an RPC request has to take at least the threshold value
defined by this property before it can be considered as slow. By default, this threshold is set to 0 (disabled).
</description>
</property>

<property>
<name>ipc.server.purge.interval</name>
<value>15</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ public void testLogSlowRPC() throws IOException, ServiceException,
TimeoutException, InterruptedException {
//No test with legacy
assumeFalse(testWithLegacy);
server.setLogSlowRPCThresholdTime(SLEEP_DURATION);
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
Expand All @@ -370,7 +371,13 @@ public void testLogSlowRPC() throws IOException, ServiceException,
assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L);
long before = rpcMetrics.getRpcSlowCalls();

// make a really slow call. Sleep sleeps for 1000ms
// Sleep sleeps for 500ms(less than `logSlowRPCThresholdTime`),
// make sure we never called into Log slow RPC routine.
client.sleep(null, newSleepRequest(SLEEP_DURATION / 2));
long after = rpcMetrics.getRpcSlowCalls();
assertThat(before).isEqualTo(after);

// Make a really slow call. Sleep sleeps for 3000ms.
client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));

// Ensure slow call is logged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
Expand Down Expand Up @@ -365,7 +369,9 @@ public enum OperationCategory {
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY));
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY,
IPC_SERVER_LOG_SLOW_RPC,
IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2369,6 +2375,9 @@ protected String reconfigurePropertyImpl(String property, String newVal)
newVal);
} else if (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY)) {
return reconfigureMinBlocksForWrite(property, newVal);
} else if (property.equals(IPC_SERVER_LOG_SLOW_RPC) ||
(property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY))) {
return reconfigureLogSlowRPC(property, newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
Expand Down Expand Up @@ -2511,6 +2520,43 @@ String reconfigureIPCBackoffEnabled(String newVal) {
return Boolean.toString(clientBackoffEnabled);
}

String reconfigureLogSlowRPC(String property, String newVal) throws ReconfigurationException {
String result = null;
try {
if (property.equals(IPC_SERVER_LOG_SLOW_RPC)) {
if (newVal != null && !newVal.equalsIgnoreCase("true") &&
!newVal.equalsIgnoreCase("false")) {
throw new IllegalArgumentException(newVal + " is not boolean value");
}
boolean logSlowRPC = (newVal == null ? IPC_SERVER_LOG_SLOW_RPC_DEFAULT :
Boolean.parseBoolean(newVal));
rpcServer.getClientRpcServer().setLogSlowRPC(logSlowRPC);
if (rpcServer.getServiceRpcServer() != null) {
rpcServer.getServiceRpcServer().setLogSlowRPC(logSlowRPC);
}
if (rpcServer.getLifelineRpcServer() != null) {
rpcServer.getLifelineRpcServer().setLogSlowRPC(logSlowRPC);
}
result = Boolean.toString(logSlowRPC);
} else if (property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY)) {
long logSlowRPCThresholdTime = (newVal == null ?
IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT : Long.parseLong(newVal));
rpcServer.getClientRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
if (rpcServer.getServiceRpcServer() != null) {
rpcServer.getServiceRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
}
if (rpcServer.getLifelineRpcServer() != null) {
rpcServer.getLifelineRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
}
result = Long.toString(logSlowRPCThresholdTime);
}
LOG.info("RECONFIGURE* changed reconfigureLogSlowRPC {} to {}", property, result);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

String reconfigureSPSModeEvent(String newVal, String property)
throws ReconfigurationException {
if (newVal == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.junit.Before;
import org.junit.After;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -701,6 +704,49 @@ public void testReconfigureMinBlocksForWrite() throws Exception {
assertEquals(3, bm.getMinBlocksForWrite(BlockType.STRIPED));
}

@Test
public void testReconfigureLogSlowRPC() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer();
// verify default value.
assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());
assertEquals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT,
nnrs.getClientRpcServer().getLogSlowRPCThresholdTime());

// try invalid logSlowRPC.
try {
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "non-boolean");
fail("should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property ipc.server.log.slow.rpc from 'false' to 'non-boolean'",
e.getMessage());
}

// try correct logSlowRPC.
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "True");
assertTrue(nnrs.getClientRpcServer().isLogSlowRPC());

// revert to defaults.
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, null);
assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());

// try invalid logSlowRPCThresholdTime.
try {
nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"ipc.server.log.slow.rpc.threshold.ms from '0' to 'non-numeric'", e.getMessage());
}

// try correct logSlowRPCThresholdTime.
nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
"20000");
assertEquals(nnrs.getClientRpcServer().getLogSlowRPCThresholdTime(), 20000);
}

@After
public void shutDown() throws IOException {
if (cluster != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(23, outs.size());
assertEquals(25, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
Expand Down

0 comments on commit f85ac5b

Please sign in to comment.