Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16993. Datanode supports configure TopN DatanodeNetworkCounts #5597

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT =
TimeUnit.SECONDS.toMillis(2);

public static final String DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT =
"dfs.datanode.networkerrors.display.topcount";
public static final int DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT_DEFAULT = -1;

public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -2760,6 +2761,28 @@ public int getActiveTransferThreadCount() {

@Override // DataNodeMXBean
public Map<String, Map<String, Long>> getDatanodeNetworkCounts() {
int maxDisplay = getConf().getInt(DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT,
DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT_DEFAULT);
if (maxDisplay >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we first determine the size of the map? If it is less than N, we can return it directly.

ConcurrentMap<String, Map<String, Long>> map = datanodeNetworkCounts.asMap();
Set<Map.Entry<String, Map<String, Long>>> entries = map.entrySet();
List<Map.Entry<String, Map<String, Long>>> list = new ArrayList<>(entries);
list.sort((o1, o2) -> {
Map<String, Long> value1Map = o1.getValue();
Map<String, Long> value2Map = o2.getValue();
long compared =
value2Map.getOrDefault(DataNode.NETWORK_ERRORS, 0L) - value1Map.getOrDefault(
DataNode.NETWORK_ERRORS, 0L);
return (int) compared;
});
Map<String, Map<String, Long>> resultMap = new ConcurrentHashMap<>();
maxDisplay = Math.min(list.size(), maxDisplay);
for (int i = 0; i < maxDisplay; i++) {
resultMap.put(list.get(i).getKey(), list.get(i).getValue());
}
list.clear();
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
return resultMap;
}
return datanodeNetworkCounts.asMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,10 @@ public void delayGetMetaDataInputStream() {}
* leaving a stale copy of {@link DirectoryScanner#diffs}.
*/
public void waitUntilStorageRemoved() {}

/*
* Increase DatanodeNetworkErrors.
* @param dataXceiver
*/
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ public void writeBlock(final ExtendedBlock block,
mirrorOut.flush();

DataNodeFaultInjector.get().writeBlockAfterFlush();
DataNodeFaultInjector.get().incrementDatanodeNetworkErrors(this);

// read connect ack (only for clients, not for replication req)
if (isClient) {
Expand Down Expand Up @@ -1391,6 +1392,11 @@ private void incrDatanodeNetworkErrors() {
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
}

@VisibleForTesting
public void incrDatanodeNetworkErrorsWithPort() {
datanode.incrDatanodeNetworkErrors(remoteAddress);
}

/**
* Wait until the BP is registered, upto the configured amount of time.
* Throws an exception if times out, which should fail the client request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6568,6 +6568,16 @@
problem. In produce default set false, because it's have little performance loss.
</description>
</property>

<property>
<name>dfs.datanode.networkerrors.display.topcount</name>
<value>-1</value>
<description>
The number of datanodenetworkerror metric per datanode displays.
negative number represents having no limit.
</description>
</property>

<property>
<name>dfs.client.fsck.connect.timeout</name>
<value>60000ms</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.function.Supplier;

import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

public class TestDataNodeNetworkErrorsWithDefaultConf {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataNodeNetworkErrorsWithDefaultConf.class);

@Test(timeout = 60000)
public void testDatanodeNetworkErrorsMetricDefaultConf() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
cluster.waitActive();
final List<FSDataOutputStream> streams = Lists.newArrayList();
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {
dataXceiver.incrDatanodeNetworkErrorsWithPort();
}
};
DataNodeFaultInjector.set(newInjector);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
for (int i = 0; i < 100; i++) {
final Path path = new Path("/test" + i);
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 3);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
}
} catch (IOException e) {
e.printStackTrace();
}

final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
return datanodeNetworkErrors > 10;
}
}, 1000, 60000);

final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName =
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
final Object dnc =
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");
// Compute number of DatanodeNetworkCounts.
final String allDnc = dnc.toString();
int oldStringLength = allDnc.length();
String keyword = "key=networkErrors, value";
int newStringLength = allDnc.replace(keyword, "").length();
int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length();
final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
Assert.assertEquals(datanodeNetworkErrors, networkErrorsCount);
} finally {
IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldInjector);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.function.Supplier;

import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

public class TestDataNodeNetworkErrorsWithTopNConf {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataNodeNetworkErrorsWithTopNConf.class);

@Test(timeout=60000)
public void testDatanodeNetworkErrorsMetricTopN() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT, 2);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
cluster.waitActive();
final List<FSDataOutputStream> streams = Lists.newArrayList();
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {
dataXceiver.incrDatanodeNetworkErrorsWithPort();
}
};
DataNodeFaultInjector.set(newInjector);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
for (int i = 0; i < 100; i++) {
final Path path = new Path("/test" + i);
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 3);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
}
} catch (IOException e) {
e.printStackTrace();
}

final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
return datanodeNetworkErrors > 10;
}
}, 1000, 60000);
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName =
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
final Object dnc =
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");
// Compute number of DatanodeNetworkCounts.
final String allDnc = dnc.toString();
int oldStringLength = allDnc.length();
String keyword = "key=networkErrors, value";
int newStringLength = allDnc.replace(keyword, "").length();
int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length();
Assert.assertEquals(2, networkErrorsCount);
} finally {
IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldInjector);
}
}
}
Loading