Skip to content

Commit

Permalink
Fix local sending repeat failure report
Browse files Browse the repository at this point in the history
  • Loading branch information
jiajunmao committed Aug 8, 2024
1 parent ba1f67d commit 80731a6
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dockers/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</property>
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>5000</value>
<value>10000</value>
</property>
<property>
<name>dfs.namenode.ec.system.default.policy</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
ZfsFailureReport zfsFailureReport) throws IOException {

final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
Expand All @@ -1831,6 +1832,30 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}

// MLEC stuff
// 1. We get the BlockCollection from the ZFS failure report first
if (!zfsFailureReport.getFailedHdfsBlocks().isEmpty()) {
LOG.info("heartbeat contains {} zfs failed hdfs blocks", zfsFailureReport.getFailedHdfsBlocks().size());
LOG.info("=======");
}

zfsFailureReport.getFailedHdfsBlocks().forEach(zfsFailTuple -> {
final BlockInfo block = blockManager.getStoredBlock(new Block(zfsFailTuple.getFailedBlock()));

// 2. Check for the block redundancy
short expected = blockManager.getExpectedRedundancyNum(block);

final NumberReplicas n = blockManager.countNodes(block);
final int pending = blockManager.pendingReconstruction.getNumReplicas(block);
final boolean hasEnoughReplica = blockManager.hasEnoughEffectiveReplicas(block, n, pending);
LOG.info("Expected {}, num replica {}, pending {}, enough replica {}",
expected, n, pending, hasEnoughReplica);
if (!hasEnoughReplica) {
blockManager.scheduleReconstruction(block, 0);
}
});

heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ void heartbeatCheck() {
for (DatanodeDescriptor dead : deadDatanodes) {
LOG.warn("Data node {} is dead", dead.getName());


if (dead.getStorageTypes().size() == 1 && dead.getStorageTypes().contains(StorageType.ZFS)) {
LOG.warn("Failed data node is type ZFS");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package org.apache.hadoop.hdfs.server.blockmanagement;

import jni.DnodeAttributes;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;

import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ZfsFailureTuple {

private final static String regex = "^blk_(-?\\d+)$";
private final static Pattern pattern = Pattern.compile(regex);

// ----------------------------------------------
// The next two fields are from ZFS API
// Which HDFS block failed
Expand Down Expand Up @@ -57,4 +64,15 @@ public String toString() {
", datanodeStorageInfo=" + datanodeStorageInfo +
'}';
}

public static Optional<Matcher> isHdfsBlock(DnodeAttributes dnode) {
String[] blockFilePath = dnode.path.split("/");
Matcher matcher = pattern.matcher(blockFilePath[blockFilePath.length - 1]);

if (matcher.matches()) {
return Optional.of(matcher);
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -42,10 +38,12 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import jni.DnodeAttributes;
import jni.Tools;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.*;
Expand Down Expand Up @@ -116,6 +114,8 @@ enum RunningState {
= new LinkedList<BPServiceActorAction>();
private final CommandProcessingThread commandProcessingThread;

private MlecDatanodeManagement mlecDnMgmt;

BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
this.bpos = bpos;
Expand All @@ -142,6 +142,7 @@ enum RunningState {
if (nnId != null) {
this.nnId = nnId;
}
this.mlecDnMgmt = new MlecDatanodeManagement();
commandProcessingThread = new CommandProcessingThread(this);
commandProcessingThread.start();
}
Expand Down Expand Up @@ -560,23 +561,44 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
// continue;
// }

// Check whether this dnode is actually on this datanode instance
DatanodeVolumeInfo volumeInfo = this.dn.getVolumeReport().stream()
.filter(vi -> vi.getStorageType() == StorageType.ZFS)
.collect(Collectors.toList())
.get(0);

// 1. Check whether the dnode actually belongs to this datanode volume (for local testing env)
Path path = Paths.get(volumeInfo.getPath(), dnode.path);
if (!Files.exists(path)) {
continue;
}

// 1. We get the failed block from the file name
String[] volumePath = volumeInfo.getPath().split("/");
String[] blockFilePath = dnode.path.split("/");
String regex = "^blk_-(\\d+)$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(blockFilePath[blockFilePath.length - 1]);

if (matcher.matches()) {
// This means that this is a block file, not directory, not anything else
// Get the block from the file name
long hdfsBlockId = Long.parseLong(matcher.group(1));
Block block = new Block(hdfsBlockId);
// 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc)
Optional<Matcher> matcher = ZfsFailureTuple.isHdfsBlock(dnode);
if (!matcher.isPresent()) {
continue;
}

LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode.toString());
// This means that this is a block file, not directory, not anything else
// Get the block from the file name
// TODO: figure out why we need to -1 here
long hdfsBlockId = Long.parseLong(matcher.get().group(1)) - 1;

ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);
// If this is already a known issue, we ignore
if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) {
continue;
}

LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode);

ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);

this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple);
}

HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
Expand All @@ -589,7 +611,8 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
slowDisks,
zfsReport);

scheduler.updateLastHeartbeatResponseTime(monotonicNow());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.hdfs.server.blockmanagement.ZfsFailureTuple;

import java.util.HashMap;
import java.util.Map;

public class MlecDatanodeManagement {

public Map<Long, ZfsFailureTuple> knownFailures = new HashMap<>();

public MlecDatanodeManagement() {
this.knownFailures = new HashMap<>();
}
}

0 comments on commit 80731a6

Please sign in to comment.