Skip to content

Commit

Permalink
Add more code comments and minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Wikant committed Dec 20, 2024
1 parent b450fee commit 290ff09
Showing 1 changed file with 67 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -55,18 +57,24 @@ public class UnderConstructionBlocks {
private static final Logger LOG =
LoggerFactory.getLogger(UnderConstructionBlocks.class);

// Amount of time to wait in between checking all block replicas
private static final Duration LONG_UNDER_CONSTRUCTION_BLOCK_CHECK_INTERVAL
= Duration.ofMinutes(5);
// Amount of time to wait before logging each individual block replica
// as warning.
// When a block replica is under construction for longer than this threshold,
// then a warning log will be periodically printed to the Namenode log.
// The value of 2 hours is selected to avoid false positive warnings for
// use-cases which require holding blocks open for several minutes.
// This value is not configurable because impact of misconfiguration is low:
// - For use-cases where blocks are expected to be held open for write for
// over 2 hours, there will be un-necessary warning logs in Namenode.
// - For use-cases where datanode decommissioning is expected to complete
// in under 2 hours, there are separate logs printed by DatanodeAdminMonitor
// to identify that decommissioning is blocked on Under Construction blocks.
private static final Duration LONG_UNDER_CONSTRUCTION_BLOCK_WARN_THRESHOLD
= Duration.ofHours(2);
= Duration.ofHours(2);
// After 2 hours
private static final Duration LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL
= Duration.ofMinutes(30);
= Duration.ofMinutes(30);

private final Map<Block, Set<BlockReplica>> replicasByBlockId =
Maps.newHashMap();
private final Map<Block, Set<BlockReplica>> replicasByBlockId
= new ConcurrentHashMap<>();
private final boolean enabled;
// Total count of Under Construction replicas. The count will match the sum
// of the sizes of all the sets of BlockReplicas in "replicasByBlockId".
Expand All @@ -75,22 +83,36 @@ public class UnderConstructionBlocks {
private int count = 0;
// DatanodeAdminMonitor invokes logWarningForLongUnderConstructionBlocks every 30 seconds.
// To reduce the number of times this method loops through the Under Construction blocks,
// the interval is limited by LONG_UNDER_CONSTRUCTION_BLOCK_CHECK_INTERVAL.
private Instant nextWarnLogCheck =
Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_CHECK_INTERVAL);
// the interval is limited by LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL.
private Instant nextWarnLogCheckTime =
Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL);

/**
* Class representing an Under Construction block replica.
* Contains the Block ID & Datanode ID used to uniquely identify
* the block replica. Also contains timestamps tracking how long
* the block replica has been Under Construction for the purpose of
* logging warnings for blocks which are Under Construction for
* greater than LONG_UNDER_CONSTRUCTION_BLOCK_WARN_THRESHOLD.
*/
static class BlockReplica {
private final Block block;
private final DatanodeDescriptor dn;
private final DatanodeDescriptor dataNodeDescriptor;
private final Instant firstReportedTime;
private Instant nextWarnLog;
private Instant nextWarnLogTime;

/**
* Initializes the Under Construction block replica.
*
* @param block - block ID for the block replica.
* @param dataNodeDescriptor - datanode ID for the datanode storing the block replica.
*/
BlockReplica(Block block,
DatanodeDescriptor dn) {
DatanodeDescriptor dataNodeDescriptor) {
this.block = block;
this.dn = dn;
this.dataNodeDescriptor = dataNodeDescriptor;
this.firstReportedTime = Instant.now();
this.nextWarnLog = firstReportedTime.plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_THRESHOLD);
this.nextWarnLogTime = firstReportedTime.plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_THRESHOLD);
}

/** @return - block ID for the Under Construction block replica. */
Expand All @@ -100,7 +122,7 @@ Block getBlock() {

/** @return - datanode ID for datanode storing the Under Construction block replica. */
DatanodeDescriptor getDatanode() {
return dn;
return dataNodeDescriptor;
}

/**
Expand All @@ -110,10 +132,10 @@ DatanodeDescriptor getDatanode() {
* @return - boolean indicating if warning should be logged for this block replica.
*/
boolean shouldLogWarning() {
if (Instant.now().isBefore(nextWarnLog)) {
if (Instant.now().isBefore(nextWarnLogTime)) {
return false;
}
nextWarnLog = Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL);
nextWarnLogTime = Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL);
return true;
}

Expand All @@ -124,7 +146,7 @@ Duration getDurationSinceReporting() {

@Override
public String toString() {
return String.format("ReportedBlockInfo [block=%s, dn=%s]", block, dn);
return String.format("BlockReplica [block=%s, dn=%s]", block, dataNodeDescriptor);
}
}

Expand All @@ -134,13 +156,14 @@ public String toString() {
* @param conf - the Hadoop HDFS configuration keys & values.
*/
UnderConstructionBlocks(Configuration conf) {
Objects.requireNonNull(conf, "Configuration cannot be null");
this.enabled = conf.getBoolean(
DFSConfigKeys.DFS_DECOMMISSION_TRACK_UNDER_CONSTRUCTION_BLOCKS,
DFSConfigKeys.DFS_DECOMMISSION_TRACK_UNDER_CONSTRUCTION_BLOCKS_DEFAULT);
if (enabled) {
LOG.info("Tracking Under Construction blocks for DatanodeAdminManager");
} else {
LOG.debug("DatanodeAdminManager will not track Under Construction blocks");
LOG.info("DatanodeAdminManager will not track Under Construction blocks");
}
}

Expand All @@ -163,14 +186,18 @@ void removeUcBlock(DatanodeDescriptor reportingNode, Block reportedBlock) {
return;
}
try {
// Extract set of BlockReplicas matching the reportedBlock
Set<BlockReplica> replicas;
// Handle striped blocks differently
if (BlockIdManager.isStripedBlockID(reportedBlock.getBlockId())) {
// Convert striped block ID and create a new Block instance
Block blkId = new Block(BlockIdManager.convertToStripedID(reportedBlock
.getBlockId()));
// Extract set of block replicas matching the reportedBlock
replicas = getBlockReplicas(blkId);
} else {
// Create a new Block instance for non-striped blocks
reportedBlock = new Block(reportedBlock);
// Extract set of block replicas matching the reportedBlock
replicas = getBlockReplicas(reportedBlock);
}
if (replicas.isEmpty()) {
Expand Down Expand Up @@ -202,13 +229,17 @@ void removeUcBlock(DatanodeDescriptor reportingNode, Block reportedBlock) {
private void removeUcBlockFromSet(DatanodeDescriptor reportingNode,
Block reportedBlock,
Set<BlockReplica> storedReplicasForBlock) {
// Extract the set of block replicas for the reportedBlock stored on the reportingNode.
// This reference is used for validation after the block replica is removed from the set.
// Extract the set of block replicas for the reportedBlock stored on the reportingNode
// which have a generation stamp less than or equal to the reportedBlock generation stamp.
// Ignore block replicas which have newer generation stamp than the reportedBlock because
// in this case the reportedBlock is stale/corrupt. This reference is used for validation
// after the block replica is removed from the set.
final List<BlockReplica> storedBlocks = storedReplicasForBlock.stream()
.filter(replica -> reportingNode.equals(replica.getDatanode())
&& reportedBlock.getGenerationStamp() >= replica.getBlock().getGenerationStamp())
.collect(Collectors.toList());
// Stop tracking the block replica for the reportedBlock stored on the reportingNode
// This is done separately from the collection step to modify the original set
storedReplicasForBlock.removeIf(replica -> reportingNode.equals(replica.getDatanode())
&& reportedBlock.getGenerationStamp() >= replica.getBlock().getGenerationStamp());
if (storedReplicasForBlock.isEmpty()) {
Expand All @@ -217,10 +248,11 @@ private void removeUcBlockFromSet(DatanodeDescriptor reportingNode,
replicasByBlockId.remove(reportedBlock);
}

// Log appropriate message based on the number of existing replicas
// Create a string representation of the extracted block replicas for logging purposes
final String storedBlockString = storedBlocks.stream()
.map(br -> br.getBlock().toString())
.collect(Collectors.joining(","));
// Log appropriate message based on the number of existing replicas
if (storedBlocks.size() > 1) {
// Duplicate block replicas were found for the reportingNode. This should never occur
// because each UC replica should only have one copy stored in "replicasByBlockId".
Expand Down Expand Up @@ -302,14 +334,18 @@ void addUcBlock(DatanodeDescriptor reportingNode, Block reportedBlock) {
return;
}
try {
// Extract set of block replicas matching the reportedBlock
Set<BlockReplica> storedReplicasForBlock;
// Handle striped blocks differently
if (BlockIdManager.isStripedBlockID(reportedBlock.getBlockId())) {
// Convert striped block ID and create a new Block instance
Block blkId = new Block(BlockIdManager.convertToStripedID(reportedBlock
.getBlockId()));
// Extract set of block replicas matching the reportedBlock
storedReplicasForBlock = getBlockReplicas(blkId);
} else {
// Create a new Block instance for non-striped blocks
reportedBlock = new Block(reportedBlock);
// Extract set of block replicas matching the reportedBlock
storedReplicasForBlock = getBlockReplicas(reportedBlock);
}

Expand Down Expand Up @@ -338,6 +374,7 @@ private void addUcBlockToSet(DatanodeDescriptor reportingNode,
List<BlockReplica> storedBlocks = storedReplicasForBlock.stream()
.filter(replica -> reportingNode.equals(replica.getDatanode()))
.collect(Collectors.toList());
// Create a string representation of the extracted block replicas for logging purposes
final String storedBlockString = storedBlocks.stream()
.map(br -> br.getBlock().toString())
.collect(Collectors.joining(","));
Expand Down Expand Up @@ -426,11 +463,11 @@ public void logWarningForLongUnderConstructionBlocks() {
}
// DatanodeAdminMonitor invokes logWarningForLongUnderConstructionBlocks every 30 seconds.
// To reduce the number of times this method loops through the Under Construction blocks,
// the interval is limited by LONG_UNDER_CONSTRUCTION_BLOCK_CHECK_INTERVAL.
if (Instant.now().isBefore(nextWarnLogCheck)) {
// the interval is limited by LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL.
if (Instant.now().isBefore(nextWarnLogCheckTime)) {
return;
}
nextWarnLogCheck = Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_CHECK_INTERVAL);
nextWarnLogCheckTime = Instant.now().plus(LONG_UNDER_CONSTRUCTION_BLOCK_WARN_INTERVAL);

// Log a warning for each Under Construction block replica which meets the conditions.
Stream<BlockReplica> allReplicas = replicasByBlockId.values()
Expand Down

0 comments on commit 290ff09

Please sign in to comment.