Skip to content

Commit

Permalink
HDFS-17506. [FGL] Performance for phase 1 (#6806)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZanderXu authored May 13, 2024
1 parent 14153f0 commit 141b190
Show file tree
Hide file tree
Showing 2 changed files with 426 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.namenode.fgl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;

/**
* This class benchmarks the throughput of NN for both global-lock and fine-grained lock.
* Using some common used RPCs, such as create,addBlock,complete,append,rename,
* delete,setPermission,setOwner,setReplication,getFileInfo,getListing,getBlockLocation,
* to build some tasks according to readWrite ratio and testing count.
* Then create a thread pool with a concurrency of the numClient to perform these tasks.
* The performance difference between Global Lock and Fine-grained Lock can be
* obtained according to the execution time.
*/
public class FSNLockBenchmarkThroughput extends Configured implements Tool {

private final FileSystem fileSystem;

public FSNLockBenchmarkThroughput(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}

public void benchmark(Path basePath, int readWriteRatio, int testingCount,
int numClients) throws Exception {
// private final
ArrayList<Path> readingPaths = new ArrayList<>();
for (int i = 0; i < 30; i++) {
Path path = new Path(basePath, "reading_" + i);
internalWriteFile(this.fileSystem, path, true, null);
readingPaths.add(path);
}

HashMap<String, Integer> detailInfo = new HashMap<>();

// building tasks.
List<Callable<Void>> tasks = buildTasks(this.fileSystem,
basePath, testingCount, readWriteRatio, readingPaths, detailInfo);
Collections.shuffle(tasks);

long startTime = System.currentTimeMillis();

ExecutorService executors = Executors.newFixedThreadPool(numClients);
try {
// Submit all tasks.
List<Future<Void>> futures = executors.invokeAll(tasks);

// Waiting result
for (Future<Void> f : futures) {
f.get();
}

long endTime = System.currentTimeMillis();
// Print result.
System.out.println("The Benchmark result is: " + tasks.size()
+ " tasks with readWriteRatio " + readWriteRatio + " completed, taking "
+ (endTime - startTime) + "(ms)");
detailInfo.forEach(
(k, v) -> System.out.println("\t operationName:" + k + ", testCount:" + v));
} finally {
executors.shutdown();
executors.shutdownNow();

for (Path path : readingPaths) {
this.fileSystem.delete(path, false);
}
}
}

// Write a little data to the path.
private void internalWriteFile(FileSystem fs, Path path, boolean writeData,
HashMap<String, Integer> detailInfo) throws IOException {
try (FSDataOutputStream outputStream = fs.create(path)) {
incOp(detailInfo, "create");
incOp(detailInfo, "complete");
byte[] data = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);
if (writeData) {
outputStream.write(data);
incOp(detailInfo, "addBlock");
}
}
}

// Append a little data to the path
private void internalAppendFile(FileSystem fs, Path path, boolean writeData,
HashMap<String, Integer> detailInfo) throws IOException {
try (FSDataOutputStream outputStream = fs.append(path)) {
incOp(detailInfo, "append");
incOp(detailInfo, "complete");
byte[] data = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);
if (writeData) {
outputStream.write(data);
}
}
}

/**
* Include getListing.
*/
private Callable<Void> getListing(FileSystem fs, Path path,
HashMap<String, Integer> detailInfo) {
return () -> {
fs.listStatus(path);
incOp(detailInfo, "getListing");
return null;
};
}

private synchronized void incOp(HashMap<String, Integer> detailInfo, String opName) {
if (detailInfo != null) {
int value = detailInfo.getOrDefault(opName, 0);
detailInfo.put(opName, value + 1);
}
}

/**
* Include getBlockLocation.
*/
private Callable<Void> getBlockLocation(FileSystem fs, Path path,
HashMap<String, Integer> detailInfo) {
return () -> {
fs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
incOp(detailInfo, "getBlockLocation");
return null;
};
}

/**
* Include getFileInfo.
*/
private Callable<Void> getFileInfo(FileSystem fs, Path path,
HashMap<String, Integer> detailInfo) {
return () -> {
fs.getFileStatus(path);
incOp(detailInfo, "getFileInfo");
return null;
};
}

/**
* Include create, addBlock, complete.
*/
private Callable<Void> writeAndDeleteFile(FileSystem fs, Path path,
HashMap<String, Integer> detailInfo) {
return () -> {
internalWriteFile(fs, path, false, detailInfo);
fs.delete(path, false);
incOp(detailInfo, "delete");
return null;
};
}

/**
* Include create, addBlock, complete, append, complete, rename, setPermission, setOwner,
* setReplication and delete.
*/
private Callable<Void> otherWriteOperation(FileSystem fs, Path path,
Path renameTargetPath, HashMap<String, Integer> detailInfo) {
return () -> {
// Create one file
internalWriteFile(fs, path, false, detailInfo);

// Append some data
internalAppendFile(fs, path, true, detailInfo);

// Rename
fs.rename(path, renameTargetPath);
incOp(detailInfo, "rename");

// SetPermission
fs.setPermission(renameTargetPath, FsPermission.getDirDefault());
incOp(detailInfo, "setPermission");

// SetOwner
fs.setOwner(renameTargetPath, "mock_user", "mock_group");
incOp(detailInfo, "setOwner");

// SetReplication
fs.setReplication(renameTargetPath, (short) 4);
incOp(detailInfo, "setReplication");

// Delete
fs.delete(renameTargetPath, false);
incOp(detailInfo, "delete");

return null;
};
}

/**
* Building some callable tasks according to testingCount and readWriteRatio.
*/
private List<Callable<Void>> buildTasks(FileSystem fs, Path basePath, int testingCount,
int readWriteRatio, ArrayList<Path> readingPaths, HashMap<String, Integer> detailInfo) {
List<Callable<Void>> tasks = new ArrayList<>();

for (int i = 0; i < testingCount; i++) {
// contains 4 * 10 write RPCs.
for (int j = 0; j < 10; j++) {
Path path = new Path(basePath, "write_" + i + "_" + j);
tasks.add(writeAndDeleteFile(fs, path, detailInfo));
}

// contains 10 write RPCs.
Path srcPath = new Path(basePath, "src_" + i + "_" + System.nanoTime());
Path targetPath = new Path(basePath, "target_" + i + "_" + System.nanoTime());
tasks.add(otherWriteOperation(fs, srcPath, targetPath, detailInfo));

// The number of write RPCs is 50.
for (int j = 0; j < readWriteRatio * 50; j++) {
int opNumber = ThreadLocalRandom.current().nextInt(5);
int pathIndex = ThreadLocalRandom.current().nextInt(readingPaths.size());
Path path = readingPaths.get(pathIndex);
if (opNumber <= 1) {
tasks.add(getFileInfo(fs, path, detailInfo));
} else if (opNumber <= 3) {
tasks.add(getBlockLocation(fs, path, detailInfo));
} else {
tasks.add(getListing(fs, path, detailInfo));
}
}
}
return tasks;
}

@Override
public int run(String[] args) throws Exception {
String basePath = "/tmp/fsnlock/benchmark/throughput";
int readWriteRatio = 20;
int testingCount = 100;
int numClients = 100;

if (args.length >= 4) {
basePath = args[0];

try {
readWriteRatio = Integer.parseInt(args[1]);
if (readWriteRatio <= 0) {
printUsage("Invalid readWrite ratio: " + readWriteRatio);
}
} catch (NumberFormatException e) {
printUsage("Invalid readWrite ratio: " + e.getMessage());
}

try {
testingCount = Integer.parseInt(args[2]);
if (testingCount <= 0) {
printUsage("Invalid testing count: " + testingCount);
}
} catch (NumberFormatException e) {
printUsage("Invalid testing count: " + e.getMessage());
}

try {
numClients = Integer.parseInt(args[3]);
if (numClients <= 0) {
printUsage("Invalid num of clients: " + numClients);
}
} catch (NumberFormatException e) {
printUsage("Invalid num of clients: " + e.getMessage());
}
} else {
printUsage(null);
}

benchmark(new Path(basePath), readWriteRatio, testingCount, numClients);
return 0;
}

private static void printUsage(String msg) {
if (msg != null) {
System.out.println(msg);
}
System.err.println("Usage: FSNLockBenchmarkThroughput " +
"<base path> <read write ratio> <testing count> <num clients>");
System.exit(1);
}

public static void main(String[] args) throws Exception {
Configuration conf = new HdfsConfiguration();
FileSystem fs = FileSystem.get(conf);
int res = ToolRunner.run(conf, new FSNLockBenchmarkThroughput(fs), args);
System.exit(res);
}
}
Loading

0 comments on commit 141b190

Please sign in to comment.