Skip to content

Commit

Permalink
HDFS-17208. Add the metrics PendingAsyncDiskOperations in datanode (#…
Browse files Browse the repository at this point in the history
…6109). Contributed by Haiyang Hu.

Reviewed-by: Tao Li <[email protected]>
Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
haiyang1987 authored Oct 12, 2023
1 parent 5c22934 commit 0ed484a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ synchronized void removeVolume(String storageId) {
executors.remove(storageId);
}
}


/**
* The count of pending and running asynchronous disk operations,
* include deletion of block files and requesting sync_file_range() operations.
*/
synchronized long countPendingDeletions() {
long count = 0;
for (ThreadPoolExecutor exec : executors.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3822,5 +3822,10 @@ public long getLastDirScannerFinishTime() {
public void setLastDirScannerFinishTime(long time) {
this.lastDirScannerFinishTime = time;
}

@Override
public long getPendingAsyncDeletions() {
return asyncDiskService.countPendingDeletions();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public static void getMetrics(MetricsCollector collector,
" blocks failed in cache eviction"),
beanClass.getNumBlocksFailedToUncache())
.addGauge(Interns.info("LastDirectoryScannerFinishTime",
"Finish time of the last directory scan"), beanClass.getLastDirScannerFinishTime());
"Finish time of the last directory scan"), beanClass.getLastDirScannerFinishTime())
.addGauge(Interns.info("PendingAsyncDeletions",
"The count of pending and running asynchronous disk operations"),
beanClass.getPendingAsyncDeletions());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ public interface FSDatasetMBean extends MetricsSource {
* Returns the last time in milliseconds when the directory scanner successfully ran.
*/
long getLastDirScannerFinishTime();

/**
* Returns the count of pending and running asynchronous disk operations.
*/
long getPendingAsyncDeletions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,11 @@ public long getLastDirScannerFinishTime() {
return 0L;
}

@Override
public long getPendingAsyncDeletions() {
return 0;
}

/**
* Get metrics from the metrics source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,8 @@ public long getLastDirScannerFinishTime() {
return 0L;
}

@Override
public long getPendingAsyncDeletions() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
Expand Down Expand Up @@ -131,6 +132,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;

public class TestFsDatasetImpl {

private static final Logger LOG = LoggerFactory.getLogger(
Expand Down Expand Up @@ -1842,7 +1851,8 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
*/
@Test
public void testAysncDiskServiceDeleteReplica()
throws IOException, InterruptedException, TimeoutException {
throws IOException, InterruptedException, TimeoutException, MalformedObjectNameException,
ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
HdfsConfiguration config = new HdfsConfiguration();
// Bump up replication interval.
config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
Expand Down Expand Up @@ -1896,13 +1906,30 @@ public void delayDeleteReplica() {
// If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));

assertEquals(1, ds.asyncDiskService.countPendingDeletions());
assertEquals(1, ds.getPendingAsyncDeletions());

// Validate PendingAsyncDeletions metrics.
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=DataNode,name=FSDatasetState-" + dn.getDatanodeUuid());
long pendingAsyncDeletions = (long) mbs.getAttribute(mxbeanName,
"PendingAsyncDeletions");
assertEquals(1, pendingAsyncDeletions);

// Make it resume the removeReplicaFromMem method.
semaphore.release(1);

// Waiting for the async deletion task finish.
GenericTestUtils.waitFor(() ->
ds.asyncDiskService.countPendingDeletions() == 0, 100, 1000);

assertEquals(0, ds.getPendingAsyncDeletions());

pendingAsyncDeletions = (long) mbs.getAttribute(mxbeanName,
"PendingAsyncDeletions");
assertEquals(0, pendingAsyncDeletions);

// Sleep for two heartbeat times.
Thread.sleep(config.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
Expand Down

0 comments on commit 0ed484a

Please sign in to comment.