Skip to content

Commit

Permalink
datanode should report zfs failuer log to the namenode
Browse files Browse the repository at this point in the history
  • Loading branch information
jiajunmao committed Aug 1, 2024
1 parent 4093591 commit ba1f67d
Show file tree
Hide file tree
Showing 18 changed files with 137 additions and 120 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
Expand All @@ -47,20 +48,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
Expand All @@ -72,6 +61,7 @@
import org.apache.hadoop.thirdparty.protobuf.RpcController;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;

Expand Down Expand Up @@ -128,15 +118,28 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
return PBHelper.convert(resp.getRegistration());
}

public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
return this.sendHeartbeat(
registration, reports, cacheCapacity, cacheUsed,
xmitsInProgress, xceiverCount, failedVolumes,
volumeFailureSummary, requestFullBlockReportLease, slowPeers, slowDisks, null);
}

@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
throws IOException {
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
ZfsFailureReport zfsFailureReport) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
Expand All @@ -159,6 +162,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
if (slowDisks.haveSlowDisks()) {
builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
}
if (zfsFailureReport != null) {
builder.setZfsFailureReport(PBHelper.convertZfsFailureReport(zfsFailureReport));
}

HeartbeatResponseProto resp;
resp = ipc(() -> rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
PBHelper.convertZfsFailureReport(request.getZfsFailureReport()));
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.hdfs.server.blockmanagement.ZfsFailureTuple;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.thirdparty.protobuf.ByteString;

import org.apache.hadoop.fs.StorageType;
Expand Down Expand Up @@ -53,6 +56,8 @@
.SlowDiskReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ZfsFailureReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ZfsFailureTupleProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
Expand Down Expand Up @@ -89,38 +94,12 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;

/**
* Utilities for converting protobuf classes to and from implementation classes
Expand Down Expand Up @@ -848,6 +827,32 @@ public static VolumeFailureSummaryProto convertVolumeFailureSummary(
return builder.build();
}

public static ZfsFailureReportProto convertZfsFailureReport(ZfsFailureReport zfsFailureReport) {
return ZfsFailureReportProto.newBuilder()
.addAllFailedHdfsBlocks(zfsFailureReport.getFailedHdfsBlocks()
.stream()
.map(PBHelper::convertZfsFailureTuple)
.collect(Collectors.toList()))
.build();
}

public static ZfsFailureReport convertZfsFailureReport(ZfsFailureReportProto proto) {
return new ZfsFailureReport(proto.getFailedHdfsBlocksList().stream()
.map(PBHelper::convertZfsFailureTuple)
.collect(Collectors.toList()));
}

public static ZfsFailureTupleProto convertZfsFailureTuple(ZfsFailureTuple tuple) {
return ZfsFailureTupleProto.newBuilder()
.setFailedBlock(tuple.getFailedBlock())
.addAllEcIndex(tuple.getEcIndex())
.build();
}

public static ZfsFailureTuple convertZfsFailureTuple(ZfsFailureTupleProto proto) {
return new ZfsFailureTuple(proto.getFailedBlock(), proto.getEcIndexList());
}

public static List<SlowPeerReportProto> convertSlowPeerInfo(
SlowPeerReports slowPeers) {
if (slowPeers.getSlowPeers().size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
@Nonnull SlowDiskReports slowDisks,
ZfsFailureReport zfsFailureReport) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public class ZfsFailureTuple {
// Which HDFS block failed
private Long failedBlock;

// Which chunks in the HDFS block has failed
// Which chunks in the HDFS block has failed, if non-zero it means failed
private List<Integer> ecIndex;
// ----------------------------------------------


// After grabbing from ZFS API, we will find the DSI from namenode and populate this field
// After grabbing from ZFS API, we will find the DSI from name node and populate this field
// This is for convenience
private DatanodeStorageInfo datanodeStorageInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.ZfsFailureTuple;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
Expand Down Expand Up @@ -559,6 +555,11 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
List<DnodeAttributes> dnodes = new Tools().getFailedChunks("pool");
// This means that there is ZFS local failure
for (DnodeAttributes dnode : dnodes) {
// Enable this during real testing
// if (dnode.numFailedCols() == 0) {
// continue;
// }

// 1. We get the failed block from the file name
String[] blockFilePath = dnode.path.split("/");
String regex = "^blk_-(\\d+)$";
Expand All @@ -568,13 +569,16 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
if (matcher.matches()) {
// This means that this is a block file, not directory, not anything else
// Get the block from the file name
double hdfsBlockId = Long.parseLong(matcher.group(1));
long hdfsBlockId = Long.parseLong(matcher.group(1));
Block block = new Block(hdfsBlockId);

LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode.toString());

ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);
}
}


HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;

import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
Expand All @@ -128,7 +129,6 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Daemon;
Expand Down Expand Up @@ -301,19 +301,6 @@
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -4427,15 +4414,16 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
@Nonnull SlowDiskReports slowDisks,
ZfsFailureReport zfsReport)
throws IOException {
readLock();
try {
//get datanode commands
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary,
slowPeers, slowDisks);
slowPeers, slowDisks, zfsReport);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
Expand Down
Loading

0 comments on commit ba1f67d

Please sign in to comment.