From e4d070801f8009c231d7648f2d573095c6e3f651 Mon Sep 17 00:00:00 2001 From: Alexander Bogdanov Date: Wed, 25 Oct 2023 14:36:37 +0300 Subject: [PATCH] HDFS-17221. Add NNThroughputBenchmark params to specify subcluster load in RBF. * Added an ability to specify base directory per operation(s) and to run benchmarks in parallel in different directories; * Added unit test for 3 directories. --- .../src/site/markdown/Benchmarking.md | 5 +- .../namenode/NNThroughputBenchmark.java | 251 ++++++++++-------- .../namenode/TestNNThroughputBenchmark.java | 42 ++- 3 files changed, 187 insertions(+), 111 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Benchmarking.md b/hadoop-common-project/hadoop-common/src/site/markdown/Benchmarking.md index 2449ab5cdeda5..2c4f2cb9047be 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Benchmarking.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Benchmarking.md @@ -30,7 +30,10 @@ The benchmark first generates inputs for each thread so that the input generatio The general command line syntax is: -`hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark [genericOptions] [commandOptions]` +`hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark [genericOptions] [commandOptions] .. [genericOptions] [commandOptions]` + +Each genericOptions starts with `-op`. You may specify many commands in one run (each starting with `-op`). +Operations will run in parallel per `-baseDirName`. Separate statistics will be printed per `-baseDirName`. #### Generic Options diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index a4e88d759fb4e..61bc84aa9c21c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -26,11 +26,14 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; - -import org.apache.hadoop.util.Preconditions; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.CreateFlag; @@ -80,12 +83,12 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.VersionInfo; -import org.slf4j.event.Level; /** * Main class for a series of name-node benchmarks. @@ -162,11 +165,12 @@ static void setNameNodeLoggingLevel(Level logLevel) { * specific name-node operation. */ abstract class OperationStatsBase { - private String baseDirName = "/nnThroughputBenchmark"; + protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark"; protected static final String OP_ALL_NAME = "all"; protected static final String OP_ALL_USAGE = "-op all "; private String baseDir; + private String baseDirName; protected short replication; protected int blockSize; protected int numThreads = 0; // number of threads @@ -228,7 +232,6 @@ abstract class OperationStatsBase { abstract void printResults(); OperationStatsBase() { - baseDir = baseDirName + "/" + getOpName(); replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); blockSize = config.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); numOpsRequired = 10; @@ -295,12 +298,7 @@ void cleanUp() throws IOException { clientProto.saveNamespace(0, 0); } } - public String getBaseDirName() { - return baseDirName; - } - public void setBaseDirName(String baseDirName) { - this.baseDirName = baseDirName; - } + int getNumOpsExecuted() { return numOpsExecuted; } @@ -322,12 +320,14 @@ long getAverageTime() { } public String getBaseDir() { - setBaseDir(baseDirName + "/" + getOpName()); return baseDir; } - public void setBaseDir(String baseDir) { - this.baseDir = baseDir; + /** + * Path where operation will be executed. + */ + String getBaseDirName() { + return baseDirName; } String getClientName(int idx) { @@ -377,6 +377,22 @@ protected boolean verifyOpArgument(List args) { args.remove(ugrcIndex); } + int baseDirNameIndex = args.indexOf("-baseDirName"); + if (baseDirNameIndex >= 0) { + if (args.size() <= baseDirNameIndex + 1) { + printUsage(); + } + this.baseDirName = args.get(baseDirNameIndex + 1); + if (!this.baseDirName.startsWith("/")) { + this.baseDirName = "/" + this.baseDirName; + } + args.remove(baseDirNameIndex + 1); + args.remove(baseDirNameIndex); + } else { + this.baseDirName = BASE_DIR_NAME; + } + baseDir = this.baseDirName + "/" + getOpName(); + String type = args.get(1); if(OP_ALL_NAME.equals(type)) { type = getOpName(); @@ -388,11 +404,11 @@ protected boolean verifyOpArgument(List args) { } void printStats() { - LOG.info("--- " + getOpName() + " stats ---"); - LOG.info("# operations: " + getNumOpsExecuted()); - LOG.info("Elapsed Time: " + getElapsedTime()); - LOG.info(" Ops per sec: " + getOpsPerSecond()); - LOG.info("Average Time: " + getAverageTime()); + LOG.info("--- {} in {} stats ---", getOpName(), getBaseDir()); + LOG.info("# operations: {}", getNumOpsExecuted()); + LOG.info("Elapsed Time: {}", getElapsedTime()); + LOG.info(" Ops per sec: {}", getOpsPerSecond()); + LOG.info("Average Time: {}", getAverageTime()); } } @@ -504,7 +520,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); long start = Time.now(); - clientProto.delete(getBaseDirName(), true); + clientProto.delete(this.getBaseDirName(), true); long end = Time.now(); return end-start; } @@ -512,7 +528,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) @Override void printResults() { LOG.info("--- " + getOpName() + " inputs ---"); - LOG.info("Remove directory " + getBaseDirName()); + LOG.info("Remove directory " + this.getBaseDirName()); printStats(); } } @@ -537,7 +553,7 @@ class CreateFileStats extends OperationStatsBase { CreateFileStats(List args) { super(); - parseArguments(args); + parseArguments(new ArrayList<>(args)); } @Override @@ -551,7 +567,7 @@ void parseArguments(List args) { int nrFilesPerDir = 4; closeUponCreate = false; for (int i = 2; i < args.size(); i++) { // parse command line - if(args.get(i).equals("-files")) { + if (args.get(i).equals("-files")) { if(i+1 == args.size()) printUsage(); numOpsRequired = Integer.parseInt(args.get(++i)); } else if (args.get(i).equals("-blockSize")) { @@ -563,11 +579,6 @@ void parseArguments(List args) { } else if(args.get(i).equals("-filesPerDir")) { if(i+1 == args.size()) printUsage(); nrFilesPerDir = Integer.parseInt(args.get(++i)); - } else if(args.get(i).equals("-baseDirName")) { - if (i + 1 == args.size()) { - printUsage(); - } - setBaseDirName(args.get(++i)); } else if(args.get(i).equals("-close")) { closeUponCreate = true; } else if(!ignoreUnrelatedOptions) @@ -681,11 +692,6 @@ void parseArguments(List args) { } else if(args.get(i).equals("-dirsPerDir")) { if(i+1 == args.size()) printUsage(); nrDirsPerDir = Integer.parseInt(args.get(++i)); - } else if(args.get(i).equals("-baseDirName")) { - if (i + 1 == args.size()) { - printUsage(); - } - setBaseDirName(args.get(++i)); } else if(!ignoreUnrelatedOptions) printUsage(); } @@ -1175,7 +1181,7 @@ class BlockReportStats extends OperationStatsBase { this.blocksPerFile = 10; // set heartbeat interval to 3 min, so that expiration were 40 min config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3 * 60); - parseArguments(args); + parseArguments(new ArrayList<>(args)); // adjust replication to the number of data-nodes this.replication = (short)Math.min(replication, getNumDatanodes()); } @@ -1211,11 +1217,6 @@ void parseArguments(List args) { } else if (args.get(i).equals("-blockSize")) { if(i+1 == args.size()) printUsage(); blockSize = Integer.parseInt(args.get(++i)); - } else if(args.get(i).equals("-baseDirName")) { - if (i + 1 == args.size()) { - printUsage(); - } - setBaseDirName(args.get(++i)); } else if(!ignoreUnrelatedOptions) printUsage(); } @@ -1441,11 +1442,6 @@ void parseArguments(List args) { } else if (args.get(i).equals("-blockSize")) { if(i+1 == args.size()) printUsage(); blockSize = Integer.parseInt(args.get(++i)); - } else if(args.get(i).equals("-baseDirName")) { - if (i + 1 == args.size()) { - printUsage(); - } - setBaseDirName(args.get(++i)); } else if(!ignoreUnrelatedOptions) printUsage(); } @@ -1579,68 +1575,88 @@ public static void runBenchmark(Configuration conf, String[] args) */ @Override // Tool public int run(String[] aArgs) throws Exception { - List args = new ArrayList(Arrays.asList(aArgs)); - if(args.size() < 2 || ! args.get(0).startsWith("-op")) - printUsage(); + List plainArgs = new ArrayList(Arrays.asList(aArgs)); + List> operationsArgs = new ArrayList<>(); + int startIndex = 0; + int curIndex = startIndex; + for (String arg : plainArgs) { + if (arg.startsWith("-op") && curIndex != 0) { + operationsArgs.add(plainArgs.subList(startIndex, curIndex)); + startIndex = curIndex; + } + curIndex++; + if (curIndex == plainArgs.size()) { + operationsArgs.add(plainArgs.subList(startIndex, curIndex)); + } + } - String type = args.get(1); - boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type); + for (List args : operationsArgs) { + if (args.size() < 2 || !args.get(0).startsWith("-op")) { + printUsage(); + } + } + List ops = new ArrayList(); final URI nnUri = FileSystem.getDefaultUri(config); - // Start the NameNode - String[] argv = new String[] {}; - List ops = new ArrayList(); - OperationStatsBase opStat = null; try { - if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) { - opStat = new CreateFileStats(args); - ops.add(opStat); - } - if(runAll || MkdirsStats.OP_MKDIRS_NAME.equals(type)) { - opStat = new MkdirsStats(args); - ops.add(opStat); - } - if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) { - opStat = new OpenFileStats(args); - ops.add(opStat); - } - if(runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) { - opStat = new DeleteFileStats(args); - ops.add(opStat); - } - if (runAll || AppendFileStats.OP_APPEND_NAME.equals(type)) { - opStat = new AppendFileStats(args); - ops.add(opStat); - } - if(runAll || FileStatusStats.OP_FILE_STATUS_NAME.equals(type)) { - opStat = new FileStatusStats(args); - ops.add(opStat); - } - if(runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) { - opStat = new RenameFileStats(args); - ops.add(opStat); - } - if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) { - opStat = new BlockReportStats(args); - ops.add(opStat); - } - if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) { - if (nnUri.getScheme() != null && nnUri.getScheme().equals("hdfs")) { - LOG.warn("The replication test is ignored as it does not support " + - "standalone namenode in another process or on another host. "); - } else { - opStat = new ReplicationStats(args); + for (List args : operationsArgs) { + String type = args.get(1); + boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type); + OperationStatsBase opStat = null; + + if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) { + opStat = new CreateFileStats(new ArrayList<>(args)); ops.add(opStat); } + if(runAll || MkdirsStats.OP_MKDIRS_NAME.equals(type)) { + opStat = new MkdirsStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) { + opStat = new OpenFileStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) { + opStat = new DeleteFileStats(new ArrayList<>(args)); + ops.add(opStat); + } + if (runAll || AppendFileStats.OP_APPEND_NAME.equals(type)) { + opStat = new AppendFileStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || FileStatusStats.OP_FILE_STATUS_NAME.equals(type)) { + opStat = new FileStatusStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) { + opStat = new RenameFileStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) { + opStat = new BlockReportStats(new ArrayList<>(args)); + ops.add(opStat); + } + if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) { + if (nnUri.getScheme() != null && nnUri.getScheme().equals("hdfs")) { + LOG.warn("The replication test is ignored as it does not support " + + "standalone namenode in another process or on another host. "); + } else { + opStat = new ReplicationStats(new ArrayList<>(args)); + ops.add(opStat); + } + } + if(runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) { + opStat = new CleanAllStats(new ArrayList<>(args)); + ops.add(opStat); + } + if (ops.isEmpty()) { + printUsage(); + } } - if(runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) { - opStat = new CleanAllStats(args); - ops.add(opStat); - } - if (ops.isEmpty()) { - printUsage(); - } + + // Start the NameNode + String[] argv = new String[] {}; if (nnUri.getScheme() == null || nnUri.getScheme().equals("file")) { LOG.info("Remote NameNode is not specified. Creating one."); @@ -1666,17 +1682,36 @@ public int run(String[] aArgs) throws Exception { DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnAddr); getBlockPoolId(dfs); } - // run each benchmark - for(OperationStatsBase op : ops) { - LOG.info("Starting benchmark: " + op.getOpName() + ", baseDir: " + op.getBaseDir()); - op.benchmark(); - op.cleanUp(); - } - // print statistics - for(OperationStatsBase op : ops) { - LOG.info(""); - op.printResults(); + + // group operations by base directories and run in parallel + Map> operationsByBaseDirs = + ops.stream().collect(Collectors.groupingBy(OperationStatsBase::getBaseDirName)); + + CountDownLatch latch = new CountDownLatch(operationsByBaseDirs.size()); + for (Map.Entry> opsByDir : operationsByBaseDirs.entrySet()) { + new Thread(() -> { + try { + // run each benchmark + for (OperationStatsBase op : opsByDir.getValue()) { + LOG.info("Starting benchmark {} in {}", op.getOpName(), opsByDir.getKey()); + op.benchmark(); + op.cleanUp(); + } + // print statistics + for (OperationStatsBase op : ops) { + LOG.info(""); + op.printResults(); + LOG.info(""); + } + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } finally { + latch.countDown(); + } + }).start(); } + // wait for all of the parallel running benchmarks to finish + latch.await(); } catch(Exception e) { LOG.error(StringUtils.stringifyException(e)); throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java index a9836e0003595..e44af32e0cd7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java @@ -190,6 +190,43 @@ public void testNNThroughputForBlockReportOp() throws Exception { } } + /** + * This test runs all 3 below scenarios in parallel: + * 1. create benchmark in /nnThroughputBenchmark1 dir; + * 2. mkdirs benchmark in /nnThroughputBenchmark2 dir; + * 3. all benchmarks in /nnThroughputBenchmark3 dir. + */ + @Test(timeout = 120000) + public void testNNBenchmarkRunningInMultipleDirectoriesInParallel() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 16); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + final Configuration benchConf = new HdfsConfiguration(); + benchConf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 16); + FileSystem.setDefaultUri(benchConf, cluster.getURI()); + DistributedFileSystem fs = cluster.getFileSystem(); + + NNThroughputBenchmark.runBenchmark(benchConf, + new String[] { + "-op", "create", "-baseDirName", "nnThroughputBenchmark1", "-files", "3", + "-op", "mkdirs", "-keepResults", "-baseDirName", "nnThroughputBenchmark2", + "-op", "all", "-keepResults", "-baseDirName", "nnThroughputBenchmark3"}); + + Assert.assertTrue(fs.exists(new Path("/nnThroughputBenchmark1"))); + Assert.assertTrue(fs.exists(new Path("/nnThroughputBenchmark2"))); + Assert.assertFalse("Remove operation is not executed for -op all!", + fs.exists(new Path("/nnThroughputBenchmark3"))); + Assert.assertFalse(fs.exists(new Path("/nnThroughputBenchmark"))); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * This test runs {@link NNThroughputBenchmark} against a mini DFS cluster * with explicit -baseDirName option. @@ -214,8 +251,9 @@ public void testNNThroughputWithBaseDir() throws Exception { Assert.assertFalse(fs.exists(new Path("/nnThroughputBenchmark"))); NNThroughputBenchmark.runBenchmark(benchConf, - new String[] {"-op", "all", "-baseDirName", "/nnThroughputBenchmark1"}); - Assert.assertTrue(fs.exists(new Path("/nnThroughputBenchmark1"))); + new String[] {"-op", "all", "-keepResults", "-baseDirName", "/nnThroughputBenchmark2"}); + Assert.assertFalse("Remove operation is not executed for -op all!", + fs.exists(new Path("/nnThroughputBenchmark2"))); Assert.assertFalse(fs.exists(new Path("/nnThroughputBenchmark"))); } finally { if (cluster != null) {