Skip to content

Commit

Permalink
PR comments, further refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Nov 24, 2024
1 parent c909e78 commit 532adec
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,61 @@

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;

/**
* FileSystemCounter is an enum for defining which filesystem/storage statistics are exposed in Tez.
*/
@Private
public enum FileSystemCounter {
BYTES_READ("bytesRead"),
BYTES_WRITTEN("bytesWritten"),
READ_OPS("readOps"),
LARGE_READ_OPS("largeReadOps"),
WRITE_OPS("writeOps"),
HDFS_BYTES_READ("hdfsBytesRead"),
HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
FILE_BYTES_READ("fileBytesRead"),
FILE_BYTES_WRITTEN("fileBytesWritten"),

// Additional counters from HADOOP-13305
// Additional counters from HADOOP-13305
OP_APPEND(CommonStatisticNames.OP_APPEND),
OP_COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE),
OP_CREATE(CommonStatisticNames.OP_CREATE),
OP_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE),
OP_DELETE(CommonStatisticNames.OP_DELETE),
OP_EXISTS(CommonStatisticNames.OP_EXISTS),
OP_GET_CONTENT_SUMMARY(CommonStatisticNames.OP_GET_CONTENT_SUMMARY),
OP_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN),
OP_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS),
OP_GET_STATUS(CommonStatisticNames.OP_GET_STATUS),
OP_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS),
OP_IS_FILE(CommonStatisticNames.OP_IS_FILE),
OP_IS_DIRECTORY(CommonStatisticNames.OP_IS_DIRECTORY),
OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES),
OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
OP_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),
OP_MKDIRS(CommonStatisticNames.OP_MKDIRS),
OP_MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES),
OP_OPEN(CommonStatisticNames.OP_OPEN),
OP_REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL),
OP_REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES),
OP_REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL),
OP_RENAME(CommonStatisticNames.OP_RENAME),
OP_SET_ACL(CommonStatisticNames.OP_SET_ACL),
OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations");
OP_SET_TIMES(CommonStatisticNames.OP_SET_TIMES),
OP_TRUNCATE(CommonStatisticNames.OP_TRUNCATE),

// counters below are not needed in production, as the scheme_countername expansion is taken care of by the
// FileSystemCounterGroup, the only reason they are here is that some analyzers still depend on them
@Deprecated
HDFS_BYTES_READ("hdfsBytesRead"),
@Deprecated
HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
@Deprecated
FILE_BYTES_READ("fileBytesRead"),
@Deprecated
FILE_BYTES_WRITTEN("fileBytesWritten");

private final String opName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public void updateCounters() {
Iterator<StorageStatistics> iter = globalStorageStatistics.iterator();
while (iter.hasNext()) {
StorageStatistics stats = iter.next();
if (!statisticUpdaters.containsKey(stats.getScheme())) {
Map<String, FileSystemStatisticUpdater> updaterSet = new TreeMap<>();
statisticUpdaters.put(stats.getScheme(), updaterSet);
}
FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme())
// Fetch or initialize the updater set for the scheme
Map<String, FileSystemStatisticUpdater> updaterSet = statisticUpdaters
.computeIfAbsent(stats.getScheme(), k -> new TreeMap<>());
// Fetch or create the updater for the specific statistic
FileSystemStatisticUpdater updater = updaterSet
.computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats));
updater.updateCounters();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
Expand All @@ -42,7 +43,7 @@ public class TestFileSystemStatisticUpdater {

private static MiniDFSCluster dfsCluster;

private static Configuration conf = new Configuration();
private static final Configuration conf = new Configuration();
private static FileSystem remoteFs;

private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
Expand All @@ -52,8 +53,7 @@ public class TestFileSystemStatisticUpdater {
public static void setup() throws IOException {
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
.build();
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
Expand All @@ -73,14 +73,11 @@ public void basicTest() throws IOException {
TezCounters counters = new TezCounters();
TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");

remoteFs.mkdirs(new Path("/tmp/foo/"));
FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
out.writeBytes("xyz");
out.close();
DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz");

updater.updateCounters();

LOG.info("Counters: " + counters);
LOG.info("Counters: {}", counters);
TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(),
FileSystemCounter.OP_MKDIRS);
TezCounter createCounter = counters.findCounter(remoteFs.getScheme(),
Expand All @@ -90,26 +87,20 @@ public void basicTest() throws IOException {
Assert.assertEquals(1, mkdirCounter.getValue());
Assert.assertEquals(1, createCounter.getValue());

FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt"));
out1.writeBytes("xyz");
out1.close();
DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc1.txt"), "xyz");

long oldCreateVal = createCounter.getValue();
updater.updateCounters();

LOG.info("Counters: " + counters);
LOG.info("Counters: {}", counters);
Assert.assertTrue("Counter not updated, old=" + oldCreateVal
+ ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal);

oldCreateVal = createCounter.getValue();
// Ensure all numbers are reset
remoteFs.clearStatistics();
FileSystem.clearStatistics();
updater.updateCounters();
LOG.info("Counters: " + counters);
LOG.info("Counters: {}", counters);
Assert.assertEquals(oldCreateVal, createCounter.getValue());

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@

public class TestTaskCounterUpdater {

private static final Logger LOG = LoggerFactory.getLogger(
TestTaskCounterUpdater.class);
private static Configuration conf = new Configuration();
private static final Logger LOG = LoggerFactory.getLogger(TestTaskCounterUpdater.class);
private static final Configuration conf = new Configuration();

@Test
public void basicTest() {
TezCounters counters = new TezCounters();
TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");

updater.updateCounters();
LOG.info("Counters: " + counters);
LOG.info("Counters: {}", counters);
TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);

Expand All @@ -49,11 +48,8 @@ public void basicTest() {
Assert.assertTrue(cpuCounter.getValue() > 0);

updater.updateCounters();
LOG.info("Counters: " + counters);
LOG.info("Counters: {}", counters);
Assert.assertTrue("Counter not updated, old=" + oldVal
+ ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal);

}


}

0 comments on commit 532adec

Please sign in to comment.