Skip to content

Commit

Permalink
add unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Jul 21, 2023
1 parent b826645 commit 9008daf
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,10 @@ public void markSlow(String dnAddr, int[] replies) {}
* Just delay delete replica a while.
*/
public void delayDeleteReplica() {}

/**
* Increase DatanodeNetworkErrors.
* @param dataXceiver
*/
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ public void writeBlock(final ExtendedBlock block,
mirrorOut.flush();

DataNodeFaultInjector.get().writeBlockAfterFlush();
DataNodeFaultInjector.get().incrementDatanodeNetworkErrors(this);

// read connect ack (only for clients, not for replication req)
if (isClient) {
Expand Down Expand Up @@ -1381,6 +1382,11 @@ private void incrDatanodeNetworkErrors() {
datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort);
}

@VisibleForTesting
public void incrDatanodeNetworkErrorsWithPort() {
datanode.incrDatanodeNetworkErrors(remoteAddress);
}

/**
* Wait until the BP is registered, upto the configured amount of time.
* Throws an exception if times out, which should fail the client request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
Expand All @@ -44,6 +43,7 @@
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.util.Lists;
import org.junit.Assert;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -374,6 +374,133 @@ public void testTimeoutMetric() throws Exception {
}
}

@Test(timeout=60000)
public void testDatanodeNetworkErrorsMetricDefaultConf() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
final List<FSDataOutputStream> streams = Lists.newArrayList();

DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {
dataXceiver.incrDatanodeNetworkErrorsWithPort();
}
};
DataNodeFaultInjector.set(newInjector);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
for (int i = 0; i < 100; i++) {
final Path path = new Path("/test" + i);
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 3);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}

final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
return datanodeNetworkErrors > 10;
}
}, 1000, 60000);

/* Test JMX datanode network counts. */
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName =
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
final Object dnc =
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");

// Compute number of DatanodeNetworkCounts.
final String allDnc = dnc.toString();
int oldStringLength = allDnc.length();
String keyword = "key=networkErrors, value";
int newStringLength = allDnc.replace(keyword, "").length();
int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length();
final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
Assert.assertEquals(datanodeNetworkErrors, networkErrorsCount);
} finally {
IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldInjector);
}
}

@Test(timeout=60000)
public void testDatanodeNetworkErrorsMetricTopN() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_NETWORKERRORS_DISPLAY_TOPCOUNT, 2);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
final List<FSDataOutputStream> streams = Lists.newArrayList();

DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
public void incrementDatanodeNetworkErrors(DataXceiver dataXceiver) {
dataXceiver.incrDatanodeNetworkErrorsWithPort();
}
};
DataNodeFaultInjector.set(newInjector);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
for (int i = 0; i < 100; i++) {
final Path path = new Path("/test" + i);
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 3);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}

final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
long datanodeNetworkErrors = getLongCounter("DatanodeNetworkErrors", dnMetrics);
return datanodeNetworkErrors > 10;
}
}, 1000, 60000);
/* Test JMX datanode network counts. */
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName =
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
final Object dnc =
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");

// Compute number of DatanodeNetworkCounts.
final String allDnc = dnc.toString();
int oldStringLength = allDnc.length();
String keyword = "key=networkErrors, value";
int newStringLength = allDnc.replace(keyword, "").length();
int networkErrorsCount = (oldStringLength - newStringLength) / keyword.length();
Assert.assertEquals(2, networkErrorsCount);
} finally {
IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldInjector);
}
}

/**
* This function ensures that writing causes TotalWritetime to increment
* and reading causes totalReadTime to move.
Expand Down Expand Up @@ -752,55 +879,4 @@ public void testNodeLocalMetrics() throws Exception {
}
}
}

@Test(timeout=60000)
public void testDatanodeNetworkErrorsMetricDefaultConf() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setStrings("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.setInt("dfs.client.block.write.replace-datanode-on-failure.min-replication", 0);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(6).build();
final List<FSDataOutputStream> streams = Lists.newArrayList();
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();

try {
for (int i = 0; i < 3; i++) {
final Path path = new Path("/test" + i);
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 2);
final DataNodeFaultInjector injector = Mockito.mock
(DataNodeFaultInjector.class);
Mockito.doThrow(new IOException("mock IOException")).
when(injector).
writeBlockAfterFlush();
DataNodeFaultInjector.set(injector);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
}
/* Test the metric. */
final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
//assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
assertCounterGt("DatanodeNetworkErrors", 1L, dnMetrics);

/* Test JMX datanode network counts. */
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName =
new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
final Object dnc =
mbs.getAttribute(mxbeanName, "DatanodeNetworkCounts");
final String allDnc = dnc.toString();
assertTrue("expected to see loopback address",
allDnc.indexOf("127.0.0.1") >= 0);
assertTrue("expected to see networkErrors",
allDnc.indexOf("networkErrors") >= 0);
} finally {
IOUtils.cleanupWithLogger(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldInjector);
}
}
}

0 comments on commit 9008daf

Please sign in to comment.