Skip to content

Commit

Permalink
Allow a block to be replicated on ZFS and DISK
Browse files Browse the repository at this point in the history
  • Loading branch information
jiajunmao committed Aug 21, 2024
1 parent 9003fc2 commit 7346580
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 47 deletions.
2 changes: 1 addition & 1 deletion dockers/dn1/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>[ZFS]/data/dataNode1</value>
<value>[ZFS]/data/dataNode1,[ZFS_RECON_BUFFER]/data/recon_buffer1</value>
</property>
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
Expand Down
2 changes: 1 addition & 1 deletion dockers/dn2/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>[DISK]/data/dataNode2</value>
<value>[DISK]/data/dataNode2,[ZFS_RECON_BUFFER]/data/recon_buffer2</value>
</property>
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
Expand Down
2 changes: 1 addition & 1 deletion dockers/dn3/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>[DISK]/data/dataNode3</value>
<value>[DISK]/data/dataNode3,[ZFS_RECON_BUFFER]/data/recon_buffer3</value>
</property>
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2187,7 +2187,7 @@ int computeReconstructionWorkForBlocks(
// Check whether rw is ZFS
Set<StorageType> blockStorageTypes = new HashSet<>();
rw.getBlock().getStorageInfos().forEachRemaining(info -> blockStorageTypes.add(info.getStorageType()));
if (blockStorageTypes.size() == 1 && blockStorageTypes.contains(StorageType.ZFS)) {
if (blockStorageTypes.contains(StorageType.ZFS)) {
// The block is ZFS, we would need to restore it back to the same datanode that it is coming from
// However, we cannot directly write, because that IO pool would be suspended, the write will fail
// We need to instruct the writer to call a different API
Expand Down Expand Up @@ -2316,7 +2316,9 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
if (block.isStriped()) {
BlockInfoStriped stripedBlock = (BlockInfoStriped) block;
if (stripedBlock.getRealDataBlockNum() > srcNodes.length) {
LOG.info("Block {} cannot be reconstructed due to shortage of source datanodes ", block);
LOG.info("Block {} cannot be reconstructed due to shortage of source datanodes, " +
"real data block num {}, src nodes {}", block, stripedBlock.getRealDataBlockNum(), srcNodes.length);
LOG.info("Source nodes {}", srcNodes);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}
Expand Down Expand Up @@ -2660,6 +2662,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
}

for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
LOG.info("Checking source storage {}", storage);
final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
storage, corruptReplicas.getNodes(block), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static BlockStoragePolicySuite createDefaultSuite(
policies[hotId] = new BlockStoragePolicy(hotId,
HdfsConstants.StoragePolicy.HOT.name(),
new StorageType[]{StorageType.DISK, StorageType.ZFS},
new StorageType[]{StorageType.ZFS},
new StorageType[]{StorageType.DISK, StorageType.ZFS},
new StorageType[]{StorageType.ARCHIVE, StorageType.ZFS});
final byte warmId = HdfsConstants.StoragePolicy.WARM.value();
policies[warmId] = new BlockStoragePolicy(warmId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,55 +558,60 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
LOG.info("**********Heartbeating************");
// If there is a repair on-going, we do not call failed chunks
// This is to work around the kernel panic on the dn->dn_holds on the ZFS side
if (ErasureCodingWorker.ongoingRepairs.isEmpty()) {
List<DnodeAttributes> dnodes = new Tools().getFailedChunks("pool");
// This means that there is ZFS local failure
for (DnodeAttributes dnode : dnodes) {
// Enable this during real testing
// if (dnode.numFailedCols() == 0) {
// continue;
// }

// Check whether this dnode is actually on this datanode instance
DatanodeVolumeInfo volumeInfo = this.dn.getVolumeReport().stream()
.filter(vi -> vi.getStorageType() == StorageType.ZFS)
.collect(Collectors.toList())
.get(0);

// 1. Check whether the dnode actually belongs to this datanode volume (for local testing env)
Path path = Paths.get(volumeInfo.getPath(), dnode.path);
if (!Files.exists(path)) {
continue;
}
// Loop around all the volumes on this datanode, and check for ZFS failures
for (DatanodeVolumeInfo volInfo : dn.getVolumeReport()) {
if (volInfo.getStorageType() == StorageType.ZFS) {
if (ErasureCodingWorker.ongoingRepairs.isEmpty()) {
List<DnodeAttributes> dnodes = new Tools().getFailedChunks("pool");
// This means that there is ZFS local failure
for (DnodeAttributes dnode : dnodes) {
// Enable this during real testing
// if (dnode.numFailedCols() == 0) {
// continue;
// }

// Check whether this dnode is actually on this datanode instance
DatanodeVolumeInfo volumeInfo = this.dn.getVolumeReport().stream()
.filter(vi -> vi.getStorageType() == StorageType.ZFS)
.collect(Collectors.toList())
.get(0);

// 1. Check whether the dnode actually belongs to this datanode volume (for local testing env)
Path path = Paths.get(volumeInfo.getPath(), dnode.path);
if (!Files.exists(path)) {
continue;
}

// 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc)
Optional<Matcher> matcher = ZfsFailureTuple.isHdfsBlock(dnode);
if (!matcher.isPresent()) {
continue;
}
// 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc)
Optional<Matcher> matcher = ZfsFailureTuple.isHdfsBlock(dnode);
if (!matcher.isPresent()) {
continue;
}

// This means that this is a block file, not directory, not anything else
// Get the block from the file name
long hdfsBlockId = Long.parseLong(matcher.get().group(1));
// This means that this is a block file, not directory, not anything else
// Get the block from the file name
long hdfsBlockId = Long.parseLong(matcher.get().group(1));

// If this is already a known issue, we ignore
if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) {
continue;
}
// If this is already a known issue, we ignore
if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) {
continue;
}

LOG.info("Failed hdfs block on {}:{} {} corresponding to zfs dn {}", dn.getDatanodeHostname(), dn.getHttpPort(),
hdfsBlockId, dnode);
LOG.info("Failed hdfs block on {}:{} {} corresponding to zfs dn {}", dn.getDatanodeHostname(), dn.getHttpPort(),
hdfsBlockId, dnode);

// MLEC: testing purpose
dnode.childStatus.set(0, 1);
// MLEC: testing purpose
dnode.childStatus.set(0, 1);

ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);
ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);

this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple);
this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple);
}
} else {
LOG.warn("REPAIR ON-GOING, skipping");
}
}
} else {
LOG.warn("REPAIR ON-GOING, skipping");
}

HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
Expand Down

0 comments on commit 7346580

Please sign in to comment.