diff --git a/dockers/hdfs-site.xml b/dockers/hdfs-site.xml
index 5056bab1177c5..c2900e1343ac0 100644
--- a/dockers/hdfs-site.xml
+++ b/dockers/hdfs-site.xml
@@ -9,7 +9,7 @@
dfs.namenode.heartbeat.recheck-interval
- 5000
+ 10000
dfs.namenode.ec.system.default.policy
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 7733c0d09f0ed..eaffaf05ea97c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1815,6 +1815,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
ZfsFailureReport zfsFailureReport) throws IOException {
+
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
@@ -1831,6 +1832,30 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
+
+ // MLEC stuff
+ // 1. We get the BlockCollection from the ZFS failure report first
+ if (!zfsFailureReport.getFailedHdfsBlocks().isEmpty()) {
+ LOG.info("heartbeat contains {} zfs failed hdfs blocks", zfsFailureReport.getFailedHdfsBlocks().size());
+ LOG.info("=======");
+ }
+
+ zfsFailureReport.getFailedHdfsBlocks().forEach(zfsFailTuple -> {
+ final BlockInfo block = blockManager.getStoredBlock(new Block(zfsFailTuple.getFailedBlock()));
+
+ // 2. Check for the block redundancy
+ short expected = blockManager.getExpectedRedundancyNum(block);
+
+ final NumberReplicas n = blockManager.countNodes(block);
+ final int pending = blockManager.pendingReconstruction.getNumReplicas(block);
+ final boolean hasEnoughReplica = blockManager.hasEnoughEffectiveReplicas(block, n, pending);
+ LOG.info("Expected {}, num replica {}, pending {}, enough replica {}",
+ expected, n, pending, hasEnoughReplica);
+ if (!hasEnoughReplica) {
+ blockManager.scheduleReconstruction(block, 0);
+ }
+ });
+
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d27ac299be4c7..995362b6fa285 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -517,7 +517,6 @@ void heartbeatCheck() {
for (DatanodeDescriptor dead : deadDatanodes) {
LOG.warn("Data node {} is dead", dead.getName());
-
if (dead.getStorageTypes().size() == 1 && dead.getStorageTypes().contains(StorageType.ZFS)) {
LOG.warn("Failed data node is type ZFS");
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ZfsFailureTuple.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ZfsFailureTuple.java
index 7ea85cfc7973a..7de8f9d600f98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ZfsFailureTuple.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ZfsFailureTuple.java
@@ -1,11 +1,18 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
+import jni.DnodeAttributes;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class ZfsFailureTuple {
+ private final static String regex = "^blk_(-?\\d+)$";
+ private final static Pattern pattern = Pattern.compile(regex);
+
// ----------------------------------------------
// The next two fields are from ZFS API
// Which HDFS block failed
@@ -57,4 +64,15 @@ public String toString() {
", datanodeStorageInfo=" + datanodeStorageInfo +
'}';
}
+
+ public static Optional isHdfsBlock(DnodeAttributes dnode) {
+ String[] blockFilePath = dnode.path.split("/");
+ Matcher matcher = pattern.matcher(blockFilePath[blockFilePath.length - 1]);
+
+ if (matcher.matches()) {
+ return Optional.of(matcher);
+ } else {
+ return Optional.empty();
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index fe92588702c9d..d467f0c9bfe2b 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -26,14 +26,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -42,10 +38,12 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import jni.DnodeAttributes;
import jni.Tools;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.*;
@@ -116,6 +114,8 @@ enum RunningState {
= new LinkedList();
private final CommandProcessingThread commandProcessingThread;
+ private MlecDatanodeManagement mlecDnMgmt;
+
BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
this.bpos = bpos;
@@ -142,6 +142,7 @@ enum RunningState {
if (nnId != null) {
this.nnId = nnId;
}
+ this.mlecDnMgmt = new MlecDatanodeManagement();
commandProcessingThread = new CommandProcessingThread(this);
commandProcessingThread.start();
}
@@ -560,23 +561,44 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
// continue;
// }
+ // Check whether this dnode is actually on this datanode instance
+ DatanodeVolumeInfo volumeInfo = this.dn.getVolumeReport().stream()
+ .filter(vi -> vi.getStorageType() == StorageType.ZFS)
+ .collect(Collectors.toList())
+ .get(0);
+
+ // 1. Check whether the dnode actually belongs to this datanode volume (for local testing env)
+ Path path = Paths.get(volumeInfo.getPath(), dnode.path);
+ if (!Files.exists(path)) {
+ continue;
+ }
+
// 1. We get the failed block from the file name
+ String[] volumePath = volumeInfo.getPath().split("/");
String[] blockFilePath = dnode.path.split("/");
- String regex = "^blk_-(\\d+)$";
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(blockFilePath[blockFilePath.length - 1]);
- if (matcher.matches()) {
- // This means that this is a block file, not directory, not anything else
- // Get the block from the file name
- long hdfsBlockId = Long.parseLong(matcher.group(1));
- Block block = new Block(hdfsBlockId);
+ // 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc)
+ Optional matcher = ZfsFailureTuple.isHdfsBlock(dnode);
+ if (!matcher.isPresent()) {
+ continue;
+ }
- LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode.toString());
+ // This means that this is a block file, not directory, not anything else
+ // Get the block from the file name
+ // TODO: figure out why we need to -1 here
+ long hdfsBlockId = Long.parseLong(matcher.get().group(1)) - 1;
- ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
- zfsReport.getFailedHdfsBlocks().add(failureTuple);
+ // If this is already a known issue, we ignore
+ if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) {
+ continue;
}
+
+ LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode);
+
+ ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
+ zfsReport.getFailedHdfsBlocks().add(failureTuple);
+
+ this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple);
}
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
@@ -589,7 +611,8 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
- slowDisks);
+ slowDisks,
+ zfsReport);
scheduler.updateLastHeartbeatResponseTime(monotonicNow());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/MlecDatanodeManagement.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/MlecDatanodeManagement.java
new file mode 100644
index 0000000000000..7a11547999e2b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/MlecDatanodeManagement.java
@@ -0,0 +1,15 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.ZfsFailureTuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MlecDatanodeManagement {
+
+ public Map knownFailures = new HashMap<>();
+
+ public MlecDatanodeManagement() {
+ this.knownFailures = new HashMap<>();
+ }
+}