diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dd3193fdadff2..d85e7c5823192 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1744,6 +1744,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false; + public static final String DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY = + "dfs.datanode.dataset.sublock.count"; + public static final long DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT = 1000L; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java new file mode 100644 index 0000000000000..7ba1df8df5232 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.List; + +/** + * This interface is used to generate sub lock name for a blockid. + */ +public interface DataSetSubLockStrategy { + + /** + * Generate sub lock name for the given blockid. + * @param blockid the block id. + * @return sub lock name for the input blockid. + */ + String blockIdToSubLock(long blockid); + + List getAllSubLockName(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java index dc5d70756277c..c98ff5413bd85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java @@ -21,8 +21,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; @@ -129,31 +127,6 @@ public static File idToBlockDir(File root, long blockId) { return new File(root, path); } - /** - * Take an example. We hava a block with blockid mapping to: - * "/data1/hadoop/hdfs/datanode/current/BP-xxxx/current/finalized/subdir0/subdir0" - * We return "subdir0/subdir0" - * @param blockId blockId - * @return The two-level subdir name - */ - public static String idToBlockDirSuffixName(long blockId) { - int d1 = (int) ((blockId >> 16) & 0x1F); - int d2 = (int) ((blockId >> 8) & 0x1F); - return DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + - DataStorage.BLOCK_SUBDIR_PREFIX + d2; - } - - public static List getAllSubDirNameForDataSetLock() { - List res = new ArrayList<>(); - for (int d1 = 0; d1 <= 0x1F; d1++) { - for (int d2 = 0; d2 <= 0x1F; d2++) { - res.add(DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + - DataStorage.BLOCK_SUBDIR_PREFIX + d2); - } - } - return res; - } - /** * @return the FileInputStream for the meta data of the given block. * @throws FileNotFoundException diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java new file mode 100644 index 0000000000000..3f22ca7a0b8ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy { + public static final Logger LOG = LoggerFactory.getLogger(DataSetSubLockStrategy.class); + + private static final String LOCK_NAME_PERFIX = "SubLock"; + private long modFactor; + + public ModDataSetSubLockStrategy(long mod) { + if (mod <= 0) { + mod = 1L; + } + this.modFactor = mod; + } + + @Override + public String blockIdToSubLock(long blockid) { + return LOCK_NAME_PERFIX + String.valueOf(blockid % modFactor); + } + + @Override + public List getAllSubLockName() { + List res = new ArrayList<>(); + for (long i = 0L; i < modFactor; i++) { + res.add(LOCK_NAME_PERFIX + i); + } + return res; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 6598a8c3dca81..91b12daef8143 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -65,9 +65,11 @@ import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; +import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.ModDataSetSubLockStrategy; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; @@ -200,7 +202,7 @@ public Block getStoredBlock(String bpid, long blkid) throws IOException { try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, bpid, getReplicaInfo(bpid, blkid).getStorageUuid(), - DatanodeUtil.idToBlockDirSuffixName(blkid))) { + datasetSubLockStrategy.blockIdToSubLock(blkid))) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -289,6 +291,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private long lastDirScannerNotifyTime; private volatile long lastDirScannerFinishTime; + private final DataSetSubLockStrategy datasetSubLockStrategy; + private final long datasetSubLockCount; + /** * An FSDataset has a directory where it loads its data files. */ @@ -393,6 +398,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT); lastDirScannerNotifyTime = System.currentTimeMillis(); + datasetSubLockCount = conf.getLong(DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY, + DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT); + this.datasetSubLockStrategy = new ModDataSetSubLockStrategy(datasetSubLockCount); } /** @@ -431,7 +439,7 @@ private synchronized void activateVolume( FsVolumeReference ref) throws IOException { for (String bp : volumeMap.getBlockPoolList()) { lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID()); - List allSubDirNameForDataSetLock = DatanodeUtil.getAllSubDirNameForDataSetLock(); + List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir); LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", @@ -636,7 +644,7 @@ public void removeVolumes( for (String storageUuid : storageToRemove) { storageMap.remove(storageUuid); for (String bp : volumeMap.getBlockPoolList()) { - List allSubDirNameForDataSetLock = DatanodeUtil.getAllSubDirNameForDataSetLock(); + List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName(); for (String dir : allSubDirNameForDataSetLock) { lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir); LOG.info("Removed DIR lock for bpid:{}, volume storageid:{}, dir:{}", @@ -834,7 +842,7 @@ public InputStream getBlockInputStream(ExtendedBlock b, ReplicaInfo info; try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -930,7 +938,7 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1397,7 +1405,7 @@ public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1451,7 +1459,7 @@ private ReplicaInPipeline append(String bpid, throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, replicaInfo.getStorageUuid(), - DatanodeUtil.idToBlockDirSuffixName(replicaInfo.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) { // If the block is cached, start uncaching it. if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new IOException("Only a Finalized replica can be appended to; " @@ -1549,7 +1557,7 @@ public ReplicaHandler recoverAppend( try { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaInPipeline replica; @@ -1584,7 +1592,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, try { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1671,7 +1679,7 @@ public ReplicaHandler createRbw( ReplicaInPipeline newReplicaInfo; try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), v.getStorageID(), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { newReplicaInfo = v.createRbw(b); if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) { throw new IOException("CreateRBW returned a replica of state " @@ -1703,7 +1711,7 @@ public ReplicaHandler recoverRbw( try { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1736,7 +1744,7 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1799,7 +1807,7 @@ public ReplicaInPipeline convertTemporaryToRbw( long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1940,7 +1948,7 @@ public ReplicaHandler createTemporary(StorageType storageType, ReplicaInPipeline newReplicaInfo; try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), v.getStorageID(), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { try { newReplicaInfo = v.createTemporary(b); LOG.debug("creating temporary for block: {} on volume: {}", @@ -1999,7 +2007,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block: " + b + " from Interrupted Thread"); @@ -2037,7 +2045,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, replicaInfo.getStorageUuid(), - DatanodeUtil.idToBlockDirSuffixName(replicaInfo.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) { // Compare generation stamp of old and new replica before finalizing if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() > replicaInfo.getGenerationStamp()) { @@ -2088,7 +2096,7 @@ public void unfinalizeBlock(ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, b.getBlockPoolId(), getStorageUuidForLock(b), - DatanodeUtil.idToBlockDirSuffixName(b.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -2487,7 +2495,7 @@ boolean removeReplicaFromMem(final ExtendedBlock block, final FsVolumeImpl volum final Block localBlock = block.getLocalBlock(); final long blockId = localBlock.getBlockId(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, volume.getStorageID(), - DatanodeUtil.idToBlockDirSuffixName(blockId))) { + datasetSubLockStrategy.blockIdToSubLock(blockId))) { final ReplicaInfo info = volumeMap.get(bpid, localBlock); if (info == null) { ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId); @@ -2577,7 +2585,7 @@ private void cacheBlock(String bpid, long blockId) { return; } try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, - info.getStorageUuid(), DatanodeUtil.idToBlockDirSuffixName(blockId))) { + info.getStorageUuid(), datasetSubLockStrategy.blockIdToSubLock(blockId))) { boolean success = false; try { info = volumeMap.get(bpid, blockId); @@ -2775,7 +2783,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) } String storageUuid = vol.getStorageID(); try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, - vol.getStorageID(), DatanodeUtil.idToBlockDirSuffixName(blockId))) { + vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId))) { if (!storageMap.containsKey(storageUuid)) { // Storage was already removed return; @@ -3031,8 +3039,8 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, } LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + ", replica=" + replica); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.DIR, bpid, - replica.getStorageUuid(), DatanodeUtil.idToBlockDirSuffixName(block.getBlockId()))) { + try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid, + replica.getStorageUuid())) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { @@ -3053,8 +3061,8 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, } LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + ", replica=" + replica); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.DIR, bpid, - replica.getStorageUuid(), DatanodeUtil.idToBlockDirSuffixName(block.getBlockId()))) { + try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid, + replica.getStorageUuid())) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { @@ -3262,7 +3270,7 @@ public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, block.getBlockPoolId(), getStorageUuidForLock(block), - DatanodeUtil.idToBlockDirSuffixName(block.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -3289,7 +3297,7 @@ public void addBlockPool(String bpid, Configuration conf) Set vols = storageMap.keySet(); for (String v : vols) { lockManager.addLock(LockLevel.VOLUME, bpid, v); - List allSubDirNameForDataSetLock = DatanodeUtil.getAllSubDirNameForDataSetLock(); + List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bpid, v, dir); LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", @@ -3424,7 +3432,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, block.getBlockPoolId(), getStorageUuidForLock(block), - DatanodeUtil.idToBlockDirSuffixName(block.getBlockId()))) { + datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) {