diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dd3193fdadff2..ea968a17baf0f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -491,6 +491,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT = TimeUnit.SECONDS.toMillis(2); + public static final String DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT = + "dfs.datanode.networkerrors.display.topcount"; + public static final int DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT_DEFAULT = -1; + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 87e8eee681d1d..d0555e2b213e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -140,6 +140,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -2760,6 +2761,28 @@ public int getActiveTransferThreadCount() { @Override // DataNodeMXBean public Map> getDatanodeNetworkCounts() { + int maxDisplay = getConf().getInt(DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT, + DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT_DEFAULT); + if (maxDisplay >= 0) { + ConcurrentMap> map = datanodeNetworkCounts.asMap(); + Set>> entries = map.entrySet(); + List>> list = new ArrayList<>(entries); + list.sort((o1, o2) -> { + Map value1Map = o1.getValue(); + Map value2Map = o2.getValue(); + long compared = + value2Map.getOrDefault(DataNode.NETWORK_ERRORS, 0L) - value1Map.getOrDefault( + DataNode.NETWORK_ERRORS, 0L); + return (int) compared; + }); + Map> resultMap = new ConcurrentHashMap<>(); + maxDisplay = Math.min(list.size(), maxDisplay); + for (int i = 0; i < maxDisplay; i++) { + resultMap.put(list.get(i).getKey(), list.get(i).getValue()); + } + list.clear(); + return resultMap; + } return datanodeNetworkCounts.asMap(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 9e046cc3600df..41335efbfd530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -178,4 +178,10 @@ public void delayGetMetaDataInputStream() {} * leaving a stale copy of {@link DirectoryScanner#diffs}. */ public void waitUntilStorageRemoved() {} + + /* + * Increase DatanodeNetworkErrors. + * @param dataXceiver + */ + public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d948c1caefd1f..677620a15b0d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -858,6 +858,7 @@ public void writeBlock(final ExtendedBlock block, mirrorOut.flush(); DataNodeFaultInjector.get().writeBlockAfterFlush(); + DataNodeFaultInjector.get().incrementDatanodeNetworkErrors(this); // read connect ack (only for clients, not for replication req) if (isClient) { @@ -1391,6 +1392,11 @@ private void incrDatanodeNetworkErrors() { datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); } + @VisibleForTesting + public void incrDatanodeNetworkErrorsWithPort() { + datanode.incrDatanodeNetworkErrors(remoteAddress); + } + /** * Wait until the BP is registered, upto the configured amount of time. * Throws an exception if times out, which should fail the client request. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2ab25f8329ce6..e9bfc109cf2c5 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6568,6 +6568,16 @@ problem. In produce default set false, because it's have little performance loss. + + + dfs.datanode.networkerrors.display.topcount + -1 + + The number of datanodenetworkerror metric per datanode displays. + negative number represents having no limit. + + + dfs.client.fsck.connect.timeout 60000ms diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithDefaultConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithDefaultConf.java new file mode 100644 index 0000000000000..8985a3fab9532 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithDefaultConf.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Lists; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +public class TestDataNodeNetworkErrorsWithDefaultConf { + private static final Logger LOG = + LoggerFactory.getLogger(TestDataNodeNetworkErrorsWithDefaultConf.class); + + @Test(timeout = 60000) + public void testDatanodeNetworkErrorsMetricDefaultConf() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(6).build(); + cluster.waitActive(); + final List streams = Lists.newArrayList(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector newInjector = new DataNodeFaultInjector() { + public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) { + dataXceiver.incrDatanodeNetworkErrorsWithPort(); + } + }; + DataNodeFaultInjector.set(newInjector); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + for (int i = 0; i < 100; i++) { + final Path path = new Path("/test" + i); + final FSDataOutputStream out = + cluster.getFileSystem().create(path, (short) 3); + streams.add(out); + out.writeBytes("old gs data\n"); + out.hflush(); + } + } catch (IOException e) { + e.printStackTrace(); + } + + final MetricsRecordBuilder dnMetrics = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics); + return datanodeNetworkErrors > 10; + } + }, 1000, 60000); + + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = + new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo"); + final Object dnc = + mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts"); + // Compute number of DatanodeNetworkCounts. + final String allDnc = dnc.toString(); + int oldStringLength = allDnc.length(); + String keyword = "key=networkErrors, value"; + int newStringLength = allDnc.replace(keyword, "").length(); + int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length(); + final MetricsRecordBuilder dnMetrics = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics); + Assert.assertEquals(datanodeNetworkErrors, networkErrorsCount); + } finally { + IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0])); + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.set(oldInjector); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithTopNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithTopNConf.java new file mode 100644 index 0000000000000..ae63474099f1e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeNetworkErrorsWithTopNConf.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Lists; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +public class TestDataNodeNetworkErrorsWithTopNConf { + private static final Logger LOG = + LoggerFactory.getLogger(TestDataNodeNetworkErrorsWithTopNConf.class); + + @Test(timeout=60000) + public void testDatanodeNetworkErrorsMetricTopN() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT, 2); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(6).build(); + cluster.waitActive(); + final List streams = Lists.newArrayList(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector newInjector = new DataNodeFaultInjector() { + public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) { + dataXceiver.incrDatanodeNetworkErrorsWithPort(); + } + }; + DataNodeFaultInjector.set(newInjector); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + for (int i = 0; i < 100; i++) { + final Path path = new Path("/test" + i); + final FSDataOutputStream out = + cluster.getFileSystem().create(path, (short) 3); + streams.add(out); + out.writeBytes("old gs data\n"); + out.hflush(); + } + } catch (IOException e) { + e.printStackTrace(); + } + + final MetricsRecordBuilder dnMetrics = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics); + return datanodeNetworkErrors > 10; + } + }, 1000, 60000); + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = + new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo"); + final Object dnc = + mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts"); + // Compute number of DatanodeNetworkCounts. + final String allDnc = dnc.toString(); + int oldStringLength = allDnc.length(); + String keyword = "key=networkErrors, value"; + int newStringLength = allDnc.replace(keyword, "").length(); + int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length(); + Assert.assertEquals(2, networkErrorsCount); + } finally { + IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0])); + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.set(oldInjector); + } + } +}