Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17683. Add metrics for acquiring dataset read/write lock. #7211

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `ProcessedCommandsOpNumOps` | Total number of processed commands operations |
| `ProcessedCommandsOpAvgTime` | Average time of processed commands operations in milliseconds |
| `NullStorageBlockReports` | Number of blocks in IBRs that failed due to null storage |
| `AcquireDatasetReadLockNumOps` | Total number of acquiring dataset read lock operations |
| `AcquireDatasetReadLockAvgTime` | Average time of acquiring dataset read lock operations in nanoseconds |
| `AcquireDatasetWriteLockNumOps` | Total number of acquiring dataset write lock operations |
| `AcquireDatasetWriteLockAvgTime` | Average time of acquiring dataset write lock operations in nanoseconds |

FsVolume
--------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ private static Tracer createTracer(Configuration conf) {
this.pipelineSupportSlownode = false;
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
this.dnConf = new DNConf(this);
this.dataSetLockManager = new DataSetLockManager(conf);
this.dataSetLockManager = new DataSetLockManager(conf, this);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
Expand All @@ -535,7 +535,7 @@ private static Tracer createTracer(Configuration conf) {
super(conf);
this.tracer = createTracer(conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.dataSetLockManager = new DataSetLockManager(conf);
this.dataSetLockManager = new DataSetLockManager(conf, this);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Stack;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +41,7 @@ public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetL
private boolean isFair = true;
private final boolean openLockTrace;
private Exception lastException;
private DataNode datanode;

/**
* Class for maintain lockMap and is thread safe.
Expand Down Expand Up @@ -136,17 +138,18 @@ public boolean shouldClear() {
}
}

public DataSetLockManager(Configuration conf) {
public DataSetLockManager() {
this.openLockTrace = true;
}

public DataSetLockManager(Configuration conf, DataNode dn) {
this.isFair = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT);
this.openLockTrace = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE,
DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT);
}

public DataSetLockManager() {
this.openLockTrace = true;
this.datanode = dn;
}

@Override
Expand Down Expand Up @@ -185,6 +188,7 @@ public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
* Return a not null ReadLock.
*/
private AutoCloseDataSetLock getReadLock(LockLevel level, String... resources) {
long startTimeNanos = Time.monotonicNowNanos();
String lockName = generateLockName(level, resources);
AutoCloseDataSetLock lock = lockMap.getReadLock(lockName);
if (lock == null) {
Expand All @@ -197,13 +201,17 @@ private AutoCloseDataSetLock getReadLock(LockLevel level, String... resources) {
if (openLockTrace) {
putThreadName(getThreadName());
}
if (datanode != null) {
datanode.metrics.addAcquireDataSetReadLock(Time.monotonicNowNanos() - startTimeNanos);
}
return lock;
}

/**
* Return a not null WriteLock.
*/
private AutoCloseDataSetLock getWriteLock(LockLevel level, String... resources) {
long startTimeNanos = Time.monotonicNowNanos();
String lockName = generateLockName(level, resources);
AutoCloseDataSetLock lock = lockMap.getWriteLock(lockName);
if (lock == null) {
Expand All @@ -216,6 +224,9 @@ private AutoCloseDataSetLock getWriteLock(LockLevel level, String... resources)
if (openLockTrace) {
putThreadName(getThreadName());
}
if (datanode != null) {
datanode.metrics.addAcquireDataSetWriteLock(Time.monotonicNowNanos() - startTimeNanos);
}
return lock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ public class DataNodeMetrics {
@Metric("Milliseconds spent on calling NN rpc")
private MutableRatesWithAggregation
nnRpcLatency = registry.newRatesWithAggregation("nnRpcLatency");
@Metric("Nanoseconds spent on acquire dataset write lock")
private MutableRate acquireDatasetWriteLock;
@Metric("Nanoseconds spent on acquire dataset read lock")
private MutableRate acquireDatasetReadLock;

final String name;
JvmMetrics jvmMetrics = null;
Expand Down Expand Up @@ -817,4 +821,12 @@ public void incrReplaceBlockOpToOtherHost() {
public void incrNullStorageBlockReports() {
nullStorageBlockReports.incr();
}

public void addAcquireDataSetReadLock(long latency) {
acquireDatasetReadLock.add(latency);
}

public void addAcquireDataSetWriteLock(long latency) {
acquireDatasetWriteLock.add(latency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
Expand Down Expand Up @@ -816,4 +817,23 @@ public Boolean get() {
}, 100, 10000);
}
}

@Test
public void testDataNodeDatasetLockMetrics() throws IOException {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
FileSystem fs = cluster.getFileSystem();
// Create and read a 1 byte file
Path tmpfile = new Path("/tmp.txt");
DFSTestUtil.createFile(fs, tmpfile,
(long)1, (short)1, 1L);
DFSTestUtil.readFile(fs, tmpfile);
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
assertCounterGt("AcquireDatasetWriteLockNumOps", (long)1, rb);
assertCounterGt("AcquireDatasetReadLockNumOps", (long)1, rb);
}
}
}
Loading