From e316b825b9d8fb65c35f1174e3ae8b9c46296066 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sun, 1 Dec 2024 07:09:54 +0000 Subject: [PATCH 1/9] WIP --- apps/nccl/src/nccl.cu | 154 ++++++++++++++------------------- include/mscclpp/executor.hpp | 1 + src/executor/execution_plan.cc | 10 ++- src/include/execution_plan.hpp | 7 +- 4 files changed, 81 insertions(+), 91 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 2b7e97360..acb3e9139 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -2,6 +2,8 @@ // Licensed under the MIT license. #include +#include +#include #include #include #include @@ -38,6 +40,17 @@ struct channelKey { bool operator==(const channelKey& other) const { return buff == other.buff && bytes == other.bytes; } }; +struct planKey { + uint64_t minMessageBytes; + uint64_t maxMessageBytes; + bool isInPlace; +}; + +struct executionPlanInstance { + planKey key; + std::shared_ptr plan; +}; + namespace std { template <> struct hash { @@ -57,8 +70,7 @@ struct ncclComm { std::vector> connections; std::vector> smSemaphores; std::shared_ptr executor; - std::shared_ptr allReducePacketIPPlan, allReducePacketOPPlan, allReduceIPPlan, - allReduceOPPlan, allGatherIPPlan, allGatherOPPlan, allGatherPacketIPPlan, allGatherPacketOPPlan; + std::unordered_map> executionPlans; std::unordered_map channelInInfos; std::unordered_map channelOutInfos; @@ -66,8 +78,6 @@ struct ncclComm { std::shared_ptr scratchBuff; std::vector remoteScratchRegMemories; - size_t allReduceSmallMessageSizeBoundary, allReduceLargeMessageSizeBoundary; - size_t allGatherSmallMessageSizeBoundary, allGatherLargeMessageSizeBoundary; uint32_t numScratchBuff; uint32_t buffFlag; }; @@ -183,6 +193,10 @@ static std::vector setupSmChannels(ncclComm_t comm, return channels; } +static std::pair load_execution_plan(const char* filename) { + return std::make_pair(std::string(filename), executionPlanInstance{}); +} + static std::shared_ptr> setupSmChannelDeviceHandles( const std::vector& smChannels) { std::vector> smChannelDeviceHandles; @@ -383,52 +397,16 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI setupRemoteMemories(commPtr->comm, rank, commPtr->scratchBuff.get(), SCRATCH_SIZE, mscclpp::Transport::CudaIpc); commPtr->executor = std::make_shared(mscclppComm); - if (getenv("ALLREDUCEPKT_IP_JSON_FILE")) - commPtr->allReducePacketIPPlan = std::make_shared( - mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_IP_JSON_FILE"))); - if (getenv("ALLREDUCEPKT_OP_JSON_FILE")) - commPtr->allReducePacketOPPlan = std::make_shared( - mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_OP_JSON_FILE"))); - if (getenv("ALLREDUCE_IP_JSON_FILE")) - commPtr->allReduceIPPlan = - std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_IP_JSON_FILE"))); - if (getenv("ALLREDUCE_OP_JSON_FILE")) - commPtr->allReduceOPPlan = - std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_OP_JSON_FILE"))); - if (getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")) - commPtr->allReduceSmallMessageSizeBoundary = parseSize(getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")); - else - commPtr->allReduceSmallMessageSizeBoundary = 16 * (1 << 10); - if (getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")) - commPtr->allReduceLargeMessageSizeBoundary = parseSize(getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")); - else - commPtr->allReduceLargeMessageSizeBoundary = 1 << 20; - - if (getenv("ALLGATHERPKT_IP_JSON_FILE")) - commPtr->allGatherPacketIPPlan = std::make_shared( - mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_IP_JSON_FILE"))); - if (getenv("ALLGATHERPKT_OP_JSON_FILE")) - commPtr->allGatherPacketOPPlan = std::make_shared( - mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_OP_JSON_FILE"))); - if (getenv("ALLGATHER_IP_JSON_FILE")) - commPtr->allGatherIPPlan = - std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_IP_JSON_FILE"))); - if (getenv("ALLGATHER_OP_JSON_FILE")) - commPtr->allGatherOPPlan = - std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_OP_JSON_FILE"))); - if (getenv("ALLGATHER_SMALL_MSG_BOUNDARY")) - commPtr->allGatherSmallMessageSizeBoundary = parseSize(getenv("ALLGATHER_SMALL_MSG_BOUNDARY")); - else - commPtr->allGatherSmallMessageSizeBoundary = (1 << 10); - if (getenv("ALLGATHER_LARGE_MSG_BOUNDARY")) - commPtr->allGatherLargeMessageSizeBoundary = parseSize(getenv("ALLGATHER_LARGE_MSG_BOUNDARY")); - else - commPtr->allGatherLargeMessageSizeBoundary = 1 << 20; - - if (commPtr->allReduceSmallMessageSizeBoundary > commPtr->allReduceLargeMessageSizeBoundary) - return ncclInvalidArgument; - if (commPtr->allGatherSmallMessageSizeBoundary > commPtr->allGatherLargeMessageSizeBoundary) - return ncclInvalidArgument; + if (getenv("COMMUNICATION_COLLECTIVE_DIR")) { + std::string collectiveDir = getenv("COMMUNICATION_COLLECTIVE_DIR"); + for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { + if (entry.is_regular_file()) { + std::string filename = entry.path().filename().string(); + auto plan = load_execution_plan(entry.path().c_str()); + commPtr->executionPlans[plan.first].push_back(plan.second); + } + } + } *comm = commPtr; return ncclSuccess; @@ -548,40 +526,39 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t size_t bytes = count * ncclTypeSize(datatype); int rank = comm->comm->bootstrap()->getRank(); - if (bytes < comm->allReduceSmallMessageSizeBoundary) { - return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); - } else { - std::shared_ptr plan; - if (bytes <= comm->allReduceLargeMessageSizeBoundary) - plan = (sendbuff == recvbuff) ? comm->allReducePacketIPPlan : comm->allReducePacketOPPlan; - else { - plan = (sendbuff == recvbuff) ? comm->allReduceIPPlan : comm->allReduceOPPlan; + std::vector& plans = comm->executionPlans["allreduce"]; + std::shared_ptr plan; + bool inPlace = sendbuff == recvbuff; + for (const auto& p : plans) { + if (bytes >= p.key.minMessageBytes && bytes < p.key.maxMessageBytes && inPlace == p.key.isInPlace) { + plan = p.plan; + break; } + } - if (plan == nullptr) - return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); + if (plan == nullptr) + return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); - switch (datatype) { - case ncclFloat16: - comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, *plan, - stream, mscclpp::PacketType::LL8); - break; - case ncclFloat32: - comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, - *plan, stream, mscclpp::PacketType::LL8); - break; - case ncclBfloat16: - comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, - mscclpp::DataType::BFLOAT16, *plan, stream, mscclpp::PacketType::LL8); - break; - case ncclInt32: - case ncclUint32: - comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, *plan, - stream, mscclpp::PacketType::LL8); - break; - default: - return ncclInvalidArgument; - } + switch (datatype) { + case ncclFloat16: + comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, *plan, + stream, mscclpp::PacketType::LL8); + break; + case ncclFloat32: + comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, *plan, + stream, mscclpp::PacketType::LL8); + break; + case ncclBfloat16: + comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, + mscclpp::DataType::BFLOAT16, *plan, stream, mscclpp::PacketType::LL8); + break; + case ncclInt32: + case ncclUint32: + comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, *plan, + stream, mscclpp::PacketType::LL8); + break; + default: + return ncclInvalidArgument; } return ncclSuccess; @@ -601,16 +578,15 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t int rank = comm->comm->bootstrap()->getRank(); int nRank = comm->comm->bootstrap()->getNranks(); - if (bytes * nRank < comm->allGatherSmallMessageSizeBoundary) - return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); - + std::vector& plans = comm->executionPlans["allgather"]; std::shared_ptr plan; - if (bytes * nRank <= comm->allGatherLargeMessageSizeBoundary) - plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherPacketIPPlan : comm->allGatherPacketOPPlan; - else { - plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherIPPlan : comm->allGatherOPPlan; + bool inPlace = sendbuff == recvbuff; + for (const auto& p : plans) { + if (bytes >= p.key.minMessageBytes && bytes < p.key.maxMessageBytes && inPlace == p.key.isInPlace) { + plan = p.plan; + break; + } } - if (plan == nullptr) return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); switch (datatype) { diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index e994548e4..c909d39ea 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -26,6 +26,7 @@ enum class PacketType { class ExecutionPlan { public: ExecutionPlan(const std::string& name, const std::string& planPath); + ExecutionPlan(const std::string& planPath); ~ExecutionPlan() = default; private: diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 20226b661..e9ad7b397 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -97,6 +97,8 @@ using json = nlohmann::json; ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath) : name(name), planPath(planPath), isUsingPacket(false) {} +ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) {} + std::vector ExecutionPlan::Impl::getChannelInfos(int rank, ChannelType channelType) const { auto pred = [channelType](const ChannelInfo& info) { return info.channelType == channelType; }; return filter(this->channelInfos.at(rank), pred); @@ -184,9 +186,12 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t constDstOffset) { std::ifstream file(this->planPath); json obj = json::parse(file); - if (this->name != obj["name"]) { + if (this->name.empty()) { + this->name = obj["name"]; + } else if (this->name != obj["name"]) { throw Error("Plan name does not match", ErrorCode::ExecutorError); } + this->collective = obj["collective"]; std::string protocol = obj["protocol"]; if (protocol == "LL") { this->isUsingPacket = true; @@ -194,6 +199,9 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->inputSize = inputSize; this->outputSize = outputSize; this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024); + this->minMessageSize = obj.value("min_message_size", 0); + this->maxMessageSize = obj.value("max_message_size", std::numeric_limits::max()); + this->isInPlace = obj["in_place"]; const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 07292d748..609bcc1d8 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -63,6 +63,7 @@ struct ChannelInfo { struct ExecutionPlan::Impl { public: Impl(const std::string name, const std::string planPath); + Impl(const std::string planPath); ~Impl() = default; std::vector getChannelInfos(int rank, ChannelType channelType) const; @@ -85,7 +86,8 @@ struct ExecutionPlan::Impl { void reset(); void operationsReset(); - const std::string name; + std::string name; + std::string collective; const std::string planPath; bool isUsingPacket; // operations for [rank][threadblock] = [operations] @@ -106,6 +108,9 @@ struct ExecutionPlan::Impl { size_t inputSize; size_t outputSize; int nThreadsPerBlock; + uint64_t minMessageSize; + uint64_t maxMessageSize; + bool isInPlace; private: std::pair calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const; From 2b10c927423e02769dffaa3371fd3133087d57e3 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sun, 1 Dec 2024 18:47:02 +0000 Subject: [PATCH 2/9] WIP --- apps/nccl/src/nccl.cu | 13 ++++++++----- include/mscclpp/executor.hpp | 6 ++++++ src/executor/execution_plan.cc | 12 ++++++++++++ src/include/execution_plan.hpp | 4 ++-- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index acb3e9139..d002ad735 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -41,8 +41,8 @@ struct channelKey { }; struct planKey { - uint64_t minMessageBytes; - uint64_t maxMessageBytes; + size_t minMessageSize; + size_t maxMessageSize; bool isInPlace; }; @@ -194,7 +194,10 @@ static std::vector setupSmChannels(ncclComm_t comm, } static std::pair load_execution_plan(const char* filename) { - return std::make_pair(std::string(filename), executionPlanInstance{}); + std::shared_ptr plan = std::make_shared(filename); + std::string collective = plan->collective(); + planKey key{plan->minMessageSize(), plan->maxMessageSize(), plan->isInPlace()}; + return std::make_pair(collective, executionPlanInstance{key, plan}); } static std::shared_ptr> setupSmChannelDeviceHandles( @@ -530,7 +533,7 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t std::shared_ptr plan; bool inPlace = sendbuff == recvbuff; for (const auto& p : plans) { - if (bytes >= p.key.minMessageBytes && bytes < p.key.maxMessageBytes && inPlace == p.key.isInPlace) { + if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { plan = p.plan; break; } @@ -582,7 +585,7 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t std::shared_ptr plan; bool inPlace = sendbuff == recvbuff; for (const auto& p : plans) { - if (bytes >= p.key.minMessageBytes && bytes < p.key.maxMessageBytes && inPlace == p.key.isInPlace) { + if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { plan = p.plan; break; } diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index c909d39ea..91261d16c 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -29,6 +29,12 @@ class ExecutionPlan { ExecutionPlan(const std::string& planPath); ~ExecutionPlan() = default; + std::string name() const; + std::string collective() const; + size_t minMessageSize() const; + size_t maxMessageSize() const; + bool isInPlace() const; + private: struct Impl; std::shared_ptr impl_; diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index e9ad7b397..c8d9c5c17 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -560,4 +560,16 @@ void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); } ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath) : impl_(std::make_shared(name, planPath)) {} +ExecutionPlan::ExecutionPlan(const std::string& planPath) : impl_(std::make_shared(planPath)) {} + +std::string ExecutionPlan::name() const { return this->impl_->name; } + +std::string ExecutionPlan::collective() const { return this->impl_->collective; } + +size_t ExecutionPlan::minMessageSize() const { return this->impl_->minMessageSize; } + +size_t ExecutionPlan::maxMessageSize() const { return this->impl_->maxMessageSize; } + +bool ExecutionPlan::isInPlace() const { return this->impl_->isInPlace; } + } // namespace mscclpp diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 609bcc1d8..00a0c5e76 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -108,8 +108,8 @@ struct ExecutionPlan::Impl { size_t inputSize; size_t outputSize; int nThreadsPerBlock; - uint64_t minMessageSize; - uint64_t maxMessageSize; + size_t minMessageSize; + size_t maxMessageSize; bool isInPlace; private: From 4d299581f6a04eae72b77f66c3b9998bf0f18e4d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 2 Dec 2024 00:19:37 +0000 Subject: [PATCH 3/9] WIP --- apps/nccl/src/nccl.cu | 9 +++++---- src/executor/execution_plan.cc | 16 +++++++++++----- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index d002ad735..f95d2aa66 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -400,8 +400,8 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI setupRemoteMemories(commPtr->comm, rank, commPtr->scratchBuff.get(), SCRATCH_SIZE, mscclpp::Transport::CudaIpc); commPtr->executor = std::make_shared(mscclppComm); - if (getenv("COMMUNICATION_COLLECTIVE_DIR")) { - std::string collectiveDir = getenv("COMMUNICATION_COLLECTIVE_DIR"); + if (getenv("MSCCLPP_EXECUTION_PLAN_DIR")) { + std::string collectiveDir = getenv("MSCCLPP_EXECUTION_PLAN_DIR"); for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { if (entry.is_regular_file()) { std::string filename = entry.path().filename().string(); @@ -583,9 +583,10 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t std::vector& plans = comm->executionPlans["allgather"]; std::shared_ptr plan; - bool inPlace = sendbuff == recvbuff; + void* basePtr = (char*)sendbuff - rank * bytes; + bool inPlace = basePtr == recvbuff; for (const auto& p : plans) { - if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { + if (bytes * nRank >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { plan = p.plan; break; } diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index c8d9c5c17..b5bc4bc99 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -97,7 +97,15 @@ using json = nlohmann::json; ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath) : name(name), planPath(planPath), isUsingPacket(false) {} -ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) {} +ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) { + std::ifstream file(this->planPath); + json obj = json::parse(file); + this->name = obj["name"]; + this->collective = obj["collective"]; + this->isInPlace = obj["inplace"]; + this->minMessageSize = obj.value("min_message_size", 0); + this->maxMessageSize = obj.value("max_message_size", std::numeric_limits::max()); +} std::vector ExecutionPlan::Impl::getChannelInfos(int rank, ChannelType channelType) const { auto pred = [channelType](const ChannelInfo& info) { return info.channelType == channelType; }; @@ -186,9 +194,7 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t constDstOffset) { std::ifstream file(this->planPath); json obj = json::parse(file); - if (this->name.empty()) { - this->name = obj["name"]; - } else if (this->name != obj["name"]) { + if (this->name != obj["name"]) { throw Error("Plan name does not match", ErrorCode::ExecutorError); } this->collective = obj["collective"]; @@ -201,7 +207,7 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024); this->minMessageSize = obj.value("min_message_size", 0); this->maxMessageSize = obj.value("max_message_size", std::numeric_limits::max()); - this->isInPlace = obj["in_place"]; + this->isInPlace = obj["inplace"]; const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { From bb25a03a4c9e6de8e10fae079b8f730cbc6d7e0e Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 2 Dec 2024 02:48:26 +0000 Subject: [PATCH 4/9] WIP --- apps/nccl/src/nccl.cu | 3 ++- include/mscclpp/executor.hpp | 1 - python/mscclpp/executor_py.cpp | 6 +++++- python/test/executor_test.py | 22 ++++++++++------------ src/executor/execution_plan.cc | 6 ------ test/executor_test.cc | 16 +++++++--------- test/mp_unit/executor_tests.cc | 2 +- 7 files changed, 25 insertions(+), 31 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index f95d2aa66..74c95bac8 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -585,8 +585,9 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t std::shared_ptr plan; void* basePtr = (char*)sendbuff - rank * bytes; bool inPlace = basePtr == recvbuff; + const size_t totalBytes = bytes * nRank; for (const auto& p : plans) { - if (bytes * nRank >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { + if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { plan = p.plan; break; } diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index 91261d16c..5d76983e6 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -25,7 +25,6 @@ enum class PacketType { class ExecutionPlan { public: - ExecutionPlan(const std::string& name, const std::string& planPath); ExecutionPlan(const std::string& planPath); ~ExecutionPlan() = default; diff --git a/python/mscclpp/executor_py.cpp b/python/mscclpp/executor_py.cpp index c550ecb00..d3add7194 100644 --- a/python/mscclpp/executor_py.cpp +++ b/python/mscclpp/executor_py.cpp @@ -22,7 +22,11 @@ void register_executor(nb::module_& m) { nb::enum_(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16); nb::class_(m, "ExecutionPlan") - .def(nb::init(), nb::arg("name"), nb::arg("planPath")); + .def(nb::init(), nb::arg("planPath")) + .def("name", &ExecutionPlan::name) + .def("collective", &ExecutionPlan::collective) + .def("min_message_size", &ExecutionPlan::minMessageSize) + .def("max_message_size", &ExecutionPlan::maxMessageSize); nb::class_(m, "Executor") .def(nb::init>(), nb::arg("comm")) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 5197b79b9..67e9929f1 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -59,7 +59,7 @@ def bench_time(n_iters: int, n_graph_iters: int, func): def bench_correctness( - execution_plan_name: str, + collective: str, input_buf: cp.ndarray, result_buf: cp.ndarray, test_buf: cp.ndarray, @@ -72,9 +72,9 @@ def bench_correctness( type_size = cp.dtype(parse_dtype(dtype_str)).itemsize fill_data_kernel_name = "fill_data_%s" % dtype_str - if "allgather" in execution_plan_name: + if "allgather" in collective: coll = "all_gather" - elif "reducescatter" in execution_plan_name: + elif "reducescatter" in collective: coll = "reduce_scatter" else: coll = "all_reduce" @@ -142,7 +142,7 @@ def allocate_buffer(nelems, dtype): def build_bufs( - execution_plan_name: str, + collective: str, size: int, in_place: bool, dtype: cp.dtype, @@ -153,7 +153,7 @@ def build_bufs( assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size) nelems = size // type_size - if "allgather" in execution_plan_name: + if "allgather" in collective: assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) nelems_input = nelems if in_place else nelems // num_ranks else: @@ -162,7 +162,7 @@ def build_bufs( result_buf = allocate_buffer(nelems_output, dtype=dtype) if in_place: - if "allgather" in execution_plan_name: + if "allgather" in collective: input_buf = cp.split(result_buf, num_ranks)[rank] else: input_buf = result_buf @@ -174,7 +174,6 @@ def build_bufs( def main( - execution_plan_name: str, execution_plan_path: str, size: int, in_place: bool = True, @@ -189,11 +188,12 @@ def main( npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan(execution_plan_name, execution_plan_path) + execution_plan = ExecutionPlan(execution_plan_path) + collective = execution_plan.collective() dtype = parse_dtype(dtype_str) input_buf, result_buf, test_buf = build_bufs( - execution_plan_name, + collective, size, in_place, dtype, @@ -215,7 +215,7 @@ def main( mscclpp_group.barrier() bench_correctness( - execution_plan_name, + collective, input_buf, result_buf, test_buf, @@ -242,7 +242,6 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("-n", "--execution_plan_name", type=str, required=True) parser.add_argument("-path", "--execution_plan_path", type=str, required=True) parser.add_argument("--size", type=str, required=True) parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") @@ -258,7 +257,6 @@ def main( buffer_size = parse_size(args.size) main( - args.execution_plan_name, args.execution_plan_path, buffer_size, args.in_place, diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index b5bc4bc99..144fb4174 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -94,9 +94,6 @@ std::set groupChannelType{mscclpp::ChannelType::NVLS}; namespace mscclpp { using json = nlohmann::json; -ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath) - : name(name), planPath(planPath), isUsingPacket(false) {} - ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) { std::ifstream file(this->planPath); json obj = json::parse(file); @@ -563,9 +560,6 @@ void ExecutionPlan::Impl::reset() { void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); } -ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath) - : impl_(std::make_shared(name, planPath)) {} - ExecutionPlan::ExecutionPlan(const std::string& planPath) : impl_(std::make_shared(planPath)) {} std::string ExecutionPlan::name() const { return this->impl_->name; } diff --git a/test/executor_test.cc b/test/executor_test.cc index e4ebcc972..68e8bfa32 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -89,9 +89,8 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 6 && argc != 7) { + if (argc != 5 && argc != 6) { std::cerr << "Usage: " << argv[0] << " " - << " " << " " << " " << " " @@ -107,14 +106,13 @@ int main(int argc, char* argv[]) { MSCCLPP_CUDATHROW(cudaSetDevice(rank)); const size_t bufferSize = parseSize(argv[1]); - const std::string executionPlanName = argv[2]; - const std::string executionPlanPath = argv[3]; - const int niters = std::stoi(argv[4]); - const int ngraphIters = std::stoi(argv[5]); + const std::string executionPlanPath = argv[2]; + const int niters = std::stoi(argv[3]); + const int ngraphIters = std::stoi(argv[4]); const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); mscclpp::PacketType packetType = mscclpp::PacketType::LL16; - if (argc == 7) { - packetType = parsePacketType(argv[6]); + if (argc == 6) { + packetType = parsePacketType(argv[5]); } std::shared_ptr bootstrap; @@ -130,7 +128,7 @@ int main(int argc, char* argv[]) { NpKit::Init(rank); } - mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath); + mscclpp::ExecutionPlan plan(executionPlanPath); std::shared_ptr sendbuff; if (mscclpp::isNvlsSupported()) { sendbuff = mscclpp::allocSharedPhysicalCuda(bufferSize); diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index 49952b6b4..116470dd1 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -55,7 +55,7 @@ TEST_F(ExecutorTest, TwoNodesAllreduce) { std::filesystem::path path = executablePath; std::filesystem::path executionFilesPath = path.parent_path().parent_path().parent_path() / "test/execution-files/allreduce.json"; - mscclpp::ExecutionPlan plan("allreduce_pairs", executionFilesPath.string()); + mscclpp::ExecutionPlan plan(executionFilesPath.string()); const int bufferSize = 1024 * 1024; std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking); From 4f8f9fd1372c23c6f40e551f9deec99d7ca2b870 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 2 Dec 2024 02:55:10 +0000 Subject: [PATCH 5/9] WIP --- apps/nccl/src/nccl.cu | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 74c95bac8..bef0eb1c3 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -2,7 +2,6 @@ // Licensed under the MIT license. #include -#include #include #include #include From fac80436291724101ffb0af7105f4411f3b95a0d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 2 Dec 2024 17:54:05 +0000 Subject: [PATCH 6/9] WIP --- apps/nccl/src/nccl.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index bef0eb1c3..e9749cd07 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -192,7 +192,7 @@ static std::vector setupSmChannels(ncclComm_t comm, return channels; } -static std::pair load_execution_plan(const char* filename) { +static std::pair loadExecutionPlan(const char* filename) { std::shared_ptr plan = std::make_shared(filename); std::string collective = plan->collective(); planKey key{plan->minMessageSize(), plan->maxMessageSize(), plan->isInPlace()}; @@ -404,7 +404,7 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { if (entry.is_regular_file()) { std::string filename = entry.path().filename().string(); - auto plan = load_execution_plan(entry.path().c_str()); + auto plan = loadExecutionPlan(entry.path().c_str()); commPtr->executionPlans[plan.first].push_back(plan.second); } } From 29b799d85ac5c2c5421434b37595205d4b139d90 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 3 Dec 2024 06:33:45 +0000 Subject: [PATCH 7/9] fix comments --- apps/nccl/src/nccl.cu | 7 +++++-- src/include/execution_plan.hpp | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index e9749cd07..9093f9868 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -192,7 +192,7 @@ static std::vector setupSmChannels(ncclComm_t comm, return channels; } -static std::pair loadExecutionPlan(const char* filename) { +static std::pair loadExecutionPlan(const std::string& filename) { std::shared_ptr plan = std::make_shared(filename); std::string collective = plan->collective(); planKey key{plan->minMessageSize(), plan->maxMessageSize(), plan->isInPlace()}; @@ -401,10 +401,13 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI if (getenv("MSCCLPP_EXECUTION_PLAN_DIR")) { std::string collectiveDir = getenv("MSCCLPP_EXECUTION_PLAN_DIR"); + if (!std::filesystem::is_directory(collectiveDir)) { + return ncclInvalidArgument; + } for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { if (entry.is_regular_file()) { std::string filename = entry.path().filename().string(); - auto plan = loadExecutionPlan(entry.path().c_str()); + auto plan = loadExecutionPlan(entry.path()); commPtr->executionPlans[plan.first].push_back(plan.second); } } diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 00a0c5e76..8d291f45f 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -62,7 +62,6 @@ struct ChannelInfo { struct ExecutionPlan::Impl { public: - Impl(const std::string name, const std::string planPath); Impl(const std::string planPath); ~Impl() = default; From 0d6af1dbda71c82cfe9dac48cfa45c29e84ba95b Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 3 Dec 2024 18:55:21 +0000 Subject: [PATCH 8/9] address comments --- apps/nccl/src/nccl.cu | 1 - test/execution-files/allreduce.json | 2 +- test/execution-files/allreduce_nvls.json | 2 +- test/execution-files/allreduce_packet.json | 2 +- test/execution-files/sendrecv.json | 2 +- test/execution-files/sendrecv_packet.json | 2 +- 6 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 9093f9868..cd75edfea 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -406,7 +406,6 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI } for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { if (entry.is_regular_file()) { - std::string filename = entry.path().filename().string(); auto plan = loadExecutionPlan(entry.path()); commPtr->executionPlans[plan.first].push_back(plan.second); } diff --git a/test/execution-files/allreduce.json b/test/execution-files/allreduce.json index afc921f4b..0a0a76590 100644 --- a/test/execution-files/allreduce.json +++ b/test/execution-files/allreduce.json @@ -1,6 +1,6 @@ { "name": "allreduce_pairs", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "Simple", "inplace": true, "num_threads_per_block": 512, diff --git a/test/execution-files/allreduce_nvls.json b/test/execution-files/allreduce_nvls.json index 069b5df9d..e882d0285 100644 --- a/test/execution-files/allreduce_nvls.json +++ b/test/execution-files/allreduce_nvls.json @@ -1,6 +1,6 @@ { "name": "allreduce_nvls", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "Simple", "inplace": true, "gpus": [ diff --git a/test/execution-files/allreduce_packet.json b/test/execution-files/allreduce_packet.json index d35a4e96b..545523447 100644 --- a/test/execution-files/allreduce_packet.json +++ b/test/execution-files/allreduce_packet.json @@ -1,6 +1,6 @@ { "name": "allreduce_pairs", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "LL", "inplace": true, "num_threads_per_block": 768, diff --git a/test/execution-files/sendrecv.json b/test/execution-files/sendrecv.json index e84a06f7d..439d5f4f4 100644 --- a/test/execution-files/sendrecv.json +++ b/test/execution-files/sendrecv.json @@ -1,6 +1,6 @@ { "name": "send_recv", - "colletive": "sendrecv", + "collective": "sendrecv", "protocol": "Simple", "inplace": false, "gpus": [ diff --git a/test/execution-files/sendrecv_packet.json b/test/execution-files/sendrecv_packet.json index 3156b6191..5d63ad7c0 100644 --- a/test/execution-files/sendrecv_packet.json +++ b/test/execution-files/sendrecv_packet.json @@ -1,6 +1,6 @@ { "name": "send_recv", - "colletive": "sendrecv", + "collective": "sendrecv", "protocol": "LL", "inplace": false, "gpus": [ From 8d749a38c036392542b884a5f9771e870ca90f23 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 3 Dec 2024 19:58:08 +0000 Subject: [PATCH 9/9] update --- python/test/test_mscclpp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 1be0b1821..3e7fa90da 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -607,7 +607,7 @@ def test_executor(mpi_group: MpiGroup, filename: str): npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename)) + execution_plan = ExecutionPlan(os.path.join(project_dir, "test", "execution-files", filename)) nelems = 1024 * 1024 cp.random.seed(42)