diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 397d81f92f60b..006144e64ad15 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic { "ipc.server.log.slow.rpc"; public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false; + public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY = + "ipc.server.log.slow.rpc.threshold.ms"; + public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0; + public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY = "ipc.server.purge.interval"; public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 73c86c09fc79e..53497e9707807 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -516,16 +516,22 @@ protected ResponseBuffer initialValue() { private final long metricsUpdaterInterval; private final ScheduledExecutorService scheduledExecutorService; - private boolean logSlowRPC = false; + private volatile boolean logSlowRPC = false; + /** Threshold time for log slow rpc. */ + private volatile long logSlowRPCThresholdTime; /** * Checks if LogSlowRPC is set true. * @return true, if LogSlowRPC is set true, false, otherwise. */ - protected boolean isLogSlowRPC() { + public boolean isLogSlowRPC() { return logSlowRPC; } + public long getLogSlowRPCThresholdTime() { + return logSlowRPCThresholdTime; + } + public int getNumInProcessHandler() { return numInProcessHandler.get(); } @@ -543,10 +549,16 @@ public long getTotalRequestsPerSecond() { * @param logSlowRPCFlag input logSlowRPCFlag. */ @VisibleForTesting - protected void setLogSlowRPC(boolean logSlowRPCFlag) { + public void setLogSlowRPC(boolean logSlowRPCFlag) { this.logSlowRPC = logSlowRPCFlag; } + @VisibleForTesting + public void setLogSlowRPCThresholdTime(long logSlowRPCThresholdMs) { + this.logSlowRPCThresholdTime = rpcMetrics.getMetricsTimeUnit(). + convert(logSlowRPCThresholdMs, TimeUnit.MILLISECONDS); + } + private void setPurgeIntervalNanos(int purgeInterval) { int tmpPurgeInterval = CommonConfigurationKeysPublic. IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT; @@ -568,12 +580,15 @@ public long getPurgeIntervalNanos() { * @param methodName - RPC Request method name * @param details - Processing Detail. * - * if this request took too much time relative to other requests - * we consider that as a slow RPC. 3 is a magic number that comes - * from 3 sigma deviation. A very simple explanation can be found - * by searching for 68-95-99.7 rule. We flag an RPC as slow RPC - * if and only if it falls above 99.7% of requests. We start this logic - * only once we have enough sample size. + * If a request took significant more time than other requests, + * and its processing time is at least `logSlowRPCThresholdMs` we consider that as a slow RPC. + * + * The definition rules for calculating whether the current request took too much time + * compared to other requests are as follows: + * 3 is a magic number that comes from 3 sigma deviation. + * A very simple explanation can be found by searching for 68-95-99.7 rule. + * We flag an RPC as slow RPC if and only if it falls above 99.7% of requests. + * We start this logic only once we have enough sample size. */ void logSlowRpcCalls(String methodName, Call call, ProcessingDetails details) { @@ -587,15 +602,14 @@ void logSlowRpcCalls(String methodName, Call call, final double threeSigma = rpcMetrics.getProcessingMean() + (rpcMetrics.getProcessingStdDev() * deviation); - long processingTime = - details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit()); + final TimeUnit metricsTimeUnit = rpcMetrics.getMetricsTimeUnit(); + long processingTime = details.get(Timing.PROCESSING, metricsTimeUnit); if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) && - (processingTime > threeSigma)) { - LOG.warn( - "Slow RPC : {} took {} {} to process from client {}," - + " the processing detail is {}", - methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call, - details.toString()); + (processingTime > threeSigma) && + (processingTime > getLogSlowRPCThresholdTime())) { + LOG.warn("Slow RPC : {} took {} {} to process from client {}, the processing detail is {}," + + " and the threshold time is {} {}.", methodName, processingTime, metricsTimeUnit, + call, details.toString(), getLogSlowRPCThresholdTime(), metricsTimeUnit); rpcMetrics.incrSlowRpc(); } } @@ -3359,6 +3373,10 @@ protected Server(String bindAddress, int port, CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC, CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT)); + this.setLogSlowRPCThresholdTime(conf.getLong( + CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT)); + this.setPurgeIntervalNanos(conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT)); diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 6c3597a83fa69..d64abf79407ae 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2526,6 +2526,15 @@ The switch to turn S3A auditing on or off. + + ipc.server.log.slow.rpc.threshold.ms + 0 + The threshold in milliseconds for logging slow rpc when ipc.server.log.slow.rpc is enabled. + Besides of being much slower than other RPC requests, an RPC request has to take at least the threshold value + defined by this property before it can be considered as slow. By default, this threshold is set to 0 (disabled). + + + ipc.server.purge.interval 15 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 0740f056c8fc9..a9eaccb3bf3df 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -355,6 +355,7 @@ public void testLogSlowRPC() throws IOException, ServiceException, TimeoutException, InterruptedException { //No test with legacy assumeFalse(testWithLegacy); + server.setLogSlowRPCThresholdTime(SLEEP_DURATION); TestRpcService2 client = getClient2(); // make 10 K fast calls for (int x = 0; x < 10000; x++) { @@ -370,7 +371,13 @@ public void testLogSlowRPC() throws IOException, ServiceException, assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L); long before = rpcMetrics.getRpcSlowCalls(); - // make a really slow call. Sleep sleeps for 1000ms + // Sleep sleeps for 500ms(less than `logSlowRPCThresholdTime`), + // make sure we never called into Log slow RPC routine. + client.sleep(null, newSleepRequest(SLEEP_DURATION / 2)); + long after = rpcMetrics.getRpcSlowCalls(); + assertThat(before).isEqualTo(after); + + // Make a really slow call. Sleep sleeps for 3000ms. client.sleep(null, newSleepRequest(SLEEP_DURATION * 3)); // Ensure slow call is logged. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index bee7db315de5c..df490ea0d9fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -126,6 +126,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY; @@ -365,7 +369,9 @@ public enum OperationCategory { DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, - DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY)); + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY, + IPC_SERVER_LOG_SLOW_RPC, + IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2369,6 +2375,9 @@ protected String reconfigurePropertyImpl(String property, String newVal) newVal); } else if (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY)) { return reconfigureMinBlocksForWrite(property, newVal); + } else if (property.equals(IPC_SERVER_LOG_SLOW_RPC) || + (property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY))) { + return reconfigureLogSlowRPC(property, newVal); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2511,6 +2520,43 @@ String reconfigureIPCBackoffEnabled(String newVal) { return Boolean.toString(clientBackoffEnabled); } + String reconfigureLogSlowRPC(String property, String newVal) throws ReconfigurationException { + String result = null; + try { + if (property.equals(IPC_SERVER_LOG_SLOW_RPC)) { + if (newVal != null && !newVal.equalsIgnoreCase("true") && + !newVal.equalsIgnoreCase("false")) { + throw new IllegalArgumentException(newVal + " is not boolean value"); + } + boolean logSlowRPC = (newVal == null ? IPC_SERVER_LOG_SLOW_RPC_DEFAULT : + Boolean.parseBoolean(newVal)); + rpcServer.getClientRpcServer().setLogSlowRPC(logSlowRPC); + if (rpcServer.getServiceRpcServer() != null) { + rpcServer.getServiceRpcServer().setLogSlowRPC(logSlowRPC); + } + if (rpcServer.getLifelineRpcServer() != null) { + rpcServer.getLifelineRpcServer().setLogSlowRPC(logSlowRPC); + } + result = Boolean.toString(logSlowRPC); + } else if (property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY)) { + long logSlowRPCThresholdTime = (newVal == null ? + IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT : Long.parseLong(newVal)); + rpcServer.getClientRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime); + if (rpcServer.getServiceRpcServer() != null) { + rpcServer.getServiceRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime); + } + if (rpcServer.getLifelineRpcServer() != null) { + rpcServer.getLifelineRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime); + } + result = Long.toString(logSlowRPCThresholdTime); + } + LOG.info("RECONFIGURE* changed reconfigureLogSlowRPC {} to {}", property, result); + return result; + } catch (IllegalArgumentException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + String reconfigureSPSModeEvent(String newVal, String property) throws ReconfigurationException { if (newVal == null diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index 63d3a45fff81e..5a0f62a8117e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -29,6 +29,9 @@ import org.junit.Before; import org.junit.After; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY; import static org.junit.Assert.*; @@ -701,6 +704,49 @@ public void testReconfigureMinBlocksForWrite() throws Exception { assertEquals(3, bm.getMinBlocksForWrite(BlockType.STRIPED)); } + @Test + public void testReconfigureLogSlowRPC() throws ReconfigurationException { + final NameNode nameNode = cluster.getNameNode(); + final NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer(); + // verify default value. + assertFalse(nnrs.getClientRpcServer().isLogSlowRPC()); + assertEquals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT, + nnrs.getClientRpcServer().getLogSlowRPCThresholdTime()); + + // try invalid logSlowRPC. + try { + nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "non-boolean"); + fail("should not reach here"); + } catch (ReconfigurationException e) { + assertEquals( + "Could not change property ipc.server.log.slow.rpc from 'false' to 'non-boolean'", + e.getMessage()); + } + + // try correct logSlowRPC. + nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "True"); + assertTrue(nnrs.getClientRpcServer().isLogSlowRPC()); + + // revert to defaults. + nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, null); + assertFalse(nnrs.getClientRpcServer().isLogSlowRPC()); + + // try invalid logSlowRPCThresholdTime. + try { + nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY, + "non-numeric"); + fail("Should not reach here"); + } catch (ReconfigurationException e) { + assertEquals("Could not change property " + + "ipc.server.log.slow.rpc.threshold.ms from '0' to 'non-numeric'", e.getMessage()); + } + + // try correct logSlowRPCThresholdTime. + nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY, + "20000"); + assertEquals(nnrs.getClientRpcServer().getLogSlowRPCThresholdTime(), 20000); + } + @After public void shutDown() throws IOException { if (cluster != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 70a8bab8b0905..1712c620d2c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -442,7 +442,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(23, outs.size()); + assertEquals(25, outs.size()); assertTrue(outs.get(0).contains("Reconfigurable properties:")); assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1)); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));