Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Binyang2014 committed Apr 8, 2024
1 parent b03be9a commit 695ff94
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 35 deletions.
2 changes: 1 addition & 1 deletion include/mscclpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum class PacketType {

class ExecutionPlan {
public:
ExecutionPlan(std::string planPath);
ExecutionPlan(const std::string name, const std::string planPath);
~ExecutionPlan() = default;

private:
Expand Down
3 changes: 2 additions & 1 deletion python/mscclpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ void register_executor(nb::module_& m) {

nb::enum_<PacketType>(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16);

nb::class_<ExecutionPlan>(m, "ExecutionPlan").def(nb::init<std::string>(), nb::arg("planPath"));
nb::class_<ExecutionPlan>(m, "ExecutionPlan")
.def(nb::init<const std::string, const std::string>(), nb::arg("name"), nb::arg("planPath"));

nb::class_<Executor>(m, "Executor")
.def(nb::init<std::shared_ptr<Communicator>, int>(), nb::arg("comm"), nb::arg("nranksPerNode"))
Expand Down
38 changes: 37 additions & 1 deletion python/test/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,29 @@

MSCCLPP_ROOT_PATH = "/root/mscclpp"

def bench_time(niters: int, func):
# capture cuda graph for niters of the kernel launch
stream = cp.cuda.Stream(non_blocking=True)
with stream:
stream.begin_capture()
for i in range(niters):
func(stream)
graph = stream.end_capture()

# now run a warm up round
graph.launch(stream)

# now run the benchmark and measure time
start = cp.cuda.Event()
end = cp.cuda.Event()

start.record(stream)
graph.launch(stream)
end.record(stream)
end.synchronize()

return cp.cuda.get_elapsed_time(start, end) / niters * 1000.0

if __name__ == "__main__":
shm_comm = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED, 0, MPI.INFO_NULL)
N_GPUS_PER_NODE = shm_comm.size
Expand All @@ -19,7 +42,7 @@
cp.cuda.Device(MPI.COMM_WORLD.rank % N_GPUS_PER_NODE).use()
mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD)
executor = Executor(mscclpp_group.communicator, N_GPUS_PER_NODE)
execution_plan = ExecutionPlan(path.join(MSCCLPP_ROOT_PATH, "test", "execution-files", "allreduce.json"))
execution_plan = ExecutionPlan("allreduce_pairs", path.join(MSCCLPP_ROOT_PATH, "test", "execution-files", "allreduce.json"))

nelems = 1024 * 1024
cp.random.seed(42)
Expand All @@ -45,5 +68,18 @@
)
stream.synchronize()
assert cp.allclose(sendbuf, expected, atol=1e-3 * MPI.COMM_WORLD.size)

execution_time = bench_time(1000, lambda stream: executor.execute(
MPI.COMM_WORLD.rank,
sendbuf.data.ptr,
sendbuf.data.ptr,
sendbuf.nbytes,
sendbuf.nbytes,
DataType.float16,
512,
execution_plan,
stream.ptr,
))
print(f"Execution time: {execution_time} us, data size: {sendbuf.nbytes} bytes")
executor = None
mscclpp_group = None
48 changes: 26 additions & 22 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ auto convertToChannelType = [](const std::string& str) {
namespace mscclpp {
using json = nlohmann::json;

ExecutionPlan::Impl::Impl(std::string planPath) : planPath(planPath), isUsingPacket(false) {}
ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath)
: name(name), planPath(planPath), isUsingPacket(false) {}

std::vector<ChannelInfo> ExecutionPlan::Impl::getChannelInfos(int rank, ChannelType channelType) const {
auto pred = [channelType](const ChannelInfo& info) { return info.channelType == channelType; };
Expand Down Expand Up @@ -121,7 +122,9 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper
void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize) {
std::ifstream file(this->planPath);
json obj = json::parse(file);
this->name = obj["name"];
if (this->name != obj["name"]) {
throw std::runtime_error("Plan name does not match");
}
std::string protocol = obj["protocol"];
if (protocol == "LL") {
this->isUsingPacket = true;
Expand Down Expand Up @@ -221,31 +224,31 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) {
}
if (op.contains("i_cids")) {
operation.nInputs = op["i_cids"].size();
}
if (op.contains("o_cids")) {
operation.nOutputs = op["o_cids"].size();
}
for (int i = 0; i < operation.nInputs; i++) {
BufferType srcBufferType = convertToBufferType(op["i_buff"]["src"]);
BufferType dstBufferType = convertToBufferType(op["i_buff"]["dst"]);
operation.inputChannelIndexes[i] =
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]];
operation.inputOffsets[i] = this->chunkSize * (int)op["i_cids"][i]["off"];
for (int i = 0; i < operation.nInputs; i++) {
BufferType srcBufferType = convertToBufferType(op["i_buff"]["src"]);
BufferType dstBufferType = convertToBufferType(op["i_buff"]["dst"]);
operation.inputChannelIndexes[i] =
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]];
operation.inputOffsets[i] = this->chunkSize * (int)op["i_cids"][i]["off"];
}
}
// will have either srcs or i_cids
if (op.contains("srcs")) {
operation.nInputs = op["srcs"].size();
operation.inputBufferType = convertToBufferType(op["srcs"][0]["buff"]);
for (int i = 0; i < operation.nInputs; i++) {
operation.inputOffsets[i] = this->chunkSize * (int)op["srcs"][i]["off"];
}
}
for (int i = 0; i < operation.nInputs; i++) {
operation.inputOffsets[i] = this->chunkSize * (int)op["srcs"][i]["off"];
}
for (int i = 0; i < operation.nOutputs; i++) {
BufferType srcBufferType = convertToBufferType(op["o_buff"]["src"]);
BufferType dstBufferType = convertToBufferType(op["o_buff"]["dst"]);
operation.outputChannelIndexes[i] =
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]];
operation.outputOffsets[i] = this->chunkSize * (int)op["o_cids"][i]["off"];
if (op.contains("o_cids")) {
operation.nOutputs = op["o_cids"].size();
for (int i = 0; i < operation.nOutputs; i++) {
BufferType srcBufferType = convertToBufferType(op["o_buff"]["src"]);
BufferType dstBufferType = convertToBufferType(op["o_buff"]["dst"]);
operation.outputChannelIndexes[i] =
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]];
operation.outputOffsets[i] = this->chunkSize * (int)op["o_cids"][i]["off"];
}
}
if (op.contains("srcbuff")) {
operation.srcBufferType = convertToBufferType(op["srcbuff"]);
Expand All @@ -269,6 +272,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) {
}
}

ExecutionPlan::ExecutionPlan(std::string planPath) : impl_(std::make_shared<Impl>(planPath)) {}
ExecutionPlan::ExecutionPlan(const std::string name, const std::string planPath)
: impl_(std::make_shared<Impl>(name, planPath)) {}

} // namespace mscclpp
10 changes: 4 additions & 6 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct Executor::Impl {
~Impl() = default;

ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t sendBufferSize,
size_t recvBufferSize, const ExecutionPlan& plan, cudaStream_t stream) {
size_t recvBufferSize, const ExecutionPlan& plan) {
ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name};
if (this->contexts.find(key) != this->contexts.end()) {
return this->contexts[key];
Expand All @@ -96,10 +96,8 @@ struct Executor::Impl {
this->setupDeviceExecutionPlan(context, rank, plan);
context.deviceExecutionPlansBuffer =
allocExtSharedCuda<char>(context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan));
MSCCLPP_CUDATHROW(cudaMemcpyAsync(context.deviceExecutionPlansBuffer.get(), context.deviceExecutionPlans.data(),
context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan),
cudaMemcpyHostToDevice, stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
memcpyCuda(context.deviceExecutionPlansBuffer.get(), (char*)context.deviceExecutionPlans.data(),
context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan), cudaMemcpyHostToDevice);
this->contexts.insert({key, context});
return context;
}
Expand Down Expand Up @@ -286,7 +284,7 @@ void Executor::execute(int rank, void* sendbuff, void* recvBuff, size_t sendBuff
DataType dataType, int nthreads, const ExecutionPlan& plan, cudaStream_t stream,
PacketType packetType) {
ExecutionContext context =
this->impl_->setupExecutionContext(rank, sendbuff, recvBuff, sendBuffSize, recvBuffSize, plan, stream);
this->impl_->setupExecutionContext(rank, sendbuff, recvBuff, sendBuffSize, recvBuffSize, plan);
this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvBuff, dataType, stream, packetType);
}

Expand Down
6 changes: 3 additions & 3 deletions src/include/execution_plan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ChannelInfo {

struct ExecutionPlan::Impl {
public:
Impl(std::string planPath);
Impl(const std::string name, const std::string planPath);
~Impl() = default;

std::vector<ChannelInfo> getChannelInfos(int rank, ChannelType channelType) const;
Expand All @@ -61,15 +61,15 @@ struct ExecutionPlan::Impl {
void setupChannels(const nlohmann::json& gpus);
void setupOperations(const nlohmann::json& gpus);

std::string planPath;
const std::string name;
const std::string planPath;
bool isUsingPacket;
// operations for [rank][threadblock] = [operations]
std::unordered_map<int, std::vector<std::vector<Operation>>> operations;
std::unordered_map<int, std::vector<ChannelInfo>> channelInfos;
// threadblockChannelMap[rank][threadblock] = [channelIndex]
std::unordered_map<int, std::vector<std::vector<std::pair<int, ChannelKey>>>> threadblockSMChannelMap;
std::unordered_map<int, std::vector<std::vector<std::pair<int, ChannelKey>>>> threadblockProxyChannelMap;
std::string name;
std::unordered_map<int, uint32_t> inputChunks;
std::unordered_map<int, uint32_t> outputChunks;
std::unordered_map<int, uint32_t> scratchChunks;
Expand Down
2 changes: 1 addition & 1 deletion test/executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int main() {
CUDACHECK(cudaSetDevice(rank));

std::shared_ptr<mscclpp::Executor> executor = std::make_shared<mscclpp::Executor>(comm, 8 /*nranksPerNode*/);
mscclpp::ExecutionPlan plan(MSCCLPP_ROOT_PATH + "/test/execution-files/allreduce_packet.json");
mscclpp::ExecutionPlan plan("allreduce_pairs", MSCCLPP_ROOT_PATH + "/test/execution-files/allreduce.json");
const int bufferSize = 1024 * 1024;
std::shared_ptr<char> sendbuff = mscclpp::allocExtSharedCuda<char>(bufferSize);
mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking);
Expand Down

0 comments on commit 695ff94

Please sign in to comment.