diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 2b7e97360..cd75edfea 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -2,6 +2,7 @@ // Licensed under the MIT license. #include +#include #include #include #include @@ -38,6 +39,17 @@ struct channelKey { bool operator==(const channelKey& other) const { return buff == other.buff && bytes == other.bytes; } }; +struct planKey { + size_t minMessageSize; + size_t maxMessageSize; + bool isInPlace; +}; + +struct executionPlanInstance { + planKey key; + std::shared_ptr plan; +}; + namespace std { template <> struct hash { @@ -57,8 +69,7 @@ struct ncclComm { std::vector> connections; std::vector> smSemaphores; std::shared_ptr executor; - std::shared_ptr allReducePacketIPPlan, allReducePacketOPPlan, allReduceIPPlan, - allReduceOPPlan, allGatherIPPlan, allGatherOPPlan, allGatherPacketIPPlan, allGatherPacketOPPlan; + std::unordered_map> executionPlans; std::unordered_map channelInInfos; std::unordered_map channelOutInfos; @@ -66,8 +77,6 @@ struct ncclComm { std::shared_ptr scratchBuff; std::vector remoteScratchRegMemories; - size_t allReduceSmallMessageSizeBoundary, allReduceLargeMessageSizeBoundary; - size_t allGatherSmallMessageSizeBoundary, allGatherLargeMessageSizeBoundary; uint32_t numScratchBuff; uint32_t buffFlag; }; @@ -183,6 +192,13 @@ static std::vector setupSmChannels(ncclComm_t comm, return channels; } +static std::pair loadExecutionPlan(const std::string& filename) { + std::shared_ptr plan = std::make_shared(filename); + std::string collective = plan->collective(); + planKey key{plan->minMessageSize(), plan->maxMessageSize(), plan->isInPlace()}; + return std::make_pair(collective, executionPlanInstance{key, plan}); +} + static std::shared_ptr> setupSmChannelDeviceHandles( const std::vector& smChannels) { std::vector> smChannelDeviceHandles; @@ -383,52 +399,18 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI setupRemoteMemories(commPtr->comm, rank, commPtr->scratchBuff.get(), SCRATCH_SIZE, mscclpp::Transport::CudaIpc); commPtr->executor = std::make_shared(mscclppComm); - if (getenv("ALLREDUCEPKT_IP_JSON_FILE")) - commPtr->allReducePacketIPPlan = std::make_shared( - mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_IP_JSON_FILE"))); - if (getenv("ALLREDUCEPKT_OP_JSON_FILE")) - commPtr->allReducePacketOPPlan = std::make_shared( - mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_OP_JSON_FILE"))); - if (getenv("ALLREDUCE_IP_JSON_FILE")) - commPtr->allReduceIPPlan = - std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_IP_JSON_FILE"))); - if (getenv("ALLREDUCE_OP_JSON_FILE")) - commPtr->allReduceOPPlan = - std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_OP_JSON_FILE"))); - if (getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")) - commPtr->allReduceSmallMessageSizeBoundary = parseSize(getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")); - else - commPtr->allReduceSmallMessageSizeBoundary = 16 * (1 << 10); - if (getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")) - commPtr->allReduceLargeMessageSizeBoundary = parseSize(getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")); - else - commPtr->allReduceLargeMessageSizeBoundary = 1 << 20; - - if (getenv("ALLGATHERPKT_IP_JSON_FILE")) - commPtr->allGatherPacketIPPlan = std::make_shared( - mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_IP_JSON_FILE"))); - if (getenv("ALLGATHERPKT_OP_JSON_FILE")) - commPtr->allGatherPacketOPPlan = std::make_shared( - mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_OP_JSON_FILE"))); - if (getenv("ALLGATHER_IP_JSON_FILE")) - commPtr->allGatherIPPlan = - std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_IP_JSON_FILE"))); - if (getenv("ALLGATHER_OP_JSON_FILE")) - commPtr->allGatherOPPlan = - std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_OP_JSON_FILE"))); - if (getenv("ALLGATHER_SMALL_MSG_BOUNDARY")) - commPtr->allGatherSmallMessageSizeBoundary = parseSize(getenv("ALLGATHER_SMALL_MSG_BOUNDARY")); - else - commPtr->allGatherSmallMessageSizeBoundary = (1 << 10); - if (getenv("ALLGATHER_LARGE_MSG_BOUNDARY")) - commPtr->allGatherLargeMessageSizeBoundary = parseSize(getenv("ALLGATHER_LARGE_MSG_BOUNDARY")); - else - commPtr->allGatherLargeMessageSizeBoundary = 1 << 20; - - if (commPtr->allReduceSmallMessageSizeBoundary > commPtr->allReduceLargeMessageSizeBoundary) - return ncclInvalidArgument; - if (commPtr->allGatherSmallMessageSizeBoundary > commPtr->allGatherLargeMessageSizeBoundary) - return ncclInvalidArgument; + if (getenv("MSCCLPP_EXECUTION_PLAN_DIR")) { + std::string collectiveDir = getenv("MSCCLPP_EXECUTION_PLAN_DIR"); + if (!std::filesystem::is_directory(collectiveDir)) { + return ncclInvalidArgument; + } + for (const auto& entry : std::filesystem::directory_iterator(collectiveDir)) { + if (entry.is_regular_file()) { + auto plan = loadExecutionPlan(entry.path()); + commPtr->executionPlans[plan.first].push_back(plan.second); + } + } + } *comm = commPtr; return ncclSuccess; @@ -548,40 +530,39 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t size_t bytes = count * ncclTypeSize(datatype); int rank = comm->comm->bootstrap()->getRank(); - if (bytes < comm->allReduceSmallMessageSizeBoundary) { - return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); - } else { - std::shared_ptr plan; - if (bytes <= comm->allReduceLargeMessageSizeBoundary) - plan = (sendbuff == recvbuff) ? comm->allReducePacketIPPlan : comm->allReducePacketOPPlan; - else { - plan = (sendbuff == recvbuff) ? comm->allReduceIPPlan : comm->allReduceOPPlan; + std::vector& plans = comm->executionPlans["allreduce"]; + std::shared_ptr plan; + bool inPlace = sendbuff == recvbuff; + for (const auto& p : plans) { + if (bytes >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { + plan = p.plan; + break; } + } - if (plan == nullptr) - return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); + if (plan == nullptr) + return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); - switch (datatype) { - case ncclFloat16: - comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, *plan, - stream, mscclpp::PacketType::LL8); - break; - case ncclFloat32: - comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, - *plan, stream, mscclpp::PacketType::LL8); - break; - case ncclBfloat16: - comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, - mscclpp::DataType::BFLOAT16, *plan, stream, mscclpp::PacketType::LL8); - break; - case ncclInt32: - case ncclUint32: - comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, *plan, - stream, mscclpp::PacketType::LL8); - break; - default: - return ncclInvalidArgument; - } + switch (datatype) { + case ncclFloat16: + comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, *plan, + stream, mscclpp::PacketType::LL8); + break; + case ncclFloat32: + comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, *plan, + stream, mscclpp::PacketType::LL8); + break; + case ncclBfloat16: + comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, + mscclpp::DataType::BFLOAT16, *plan, stream, mscclpp::PacketType::LL8); + break; + case ncclInt32: + case ncclUint32: + comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, *plan, + stream, mscclpp::PacketType::LL8); + break; + default: + return ncclInvalidArgument; } return ncclSuccess; @@ -601,16 +582,17 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t int rank = comm->comm->bootstrap()->getRank(); int nRank = comm->comm->bootstrap()->getNranks(); - if (bytes * nRank < comm->allGatherSmallMessageSizeBoundary) - return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); - + std::vector& plans = comm->executionPlans["allgather"]; std::shared_ptr plan; - if (bytes * nRank <= comm->allGatherLargeMessageSizeBoundary) - plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherPacketIPPlan : comm->allGatherPacketOPPlan; - else { - plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherIPPlan : comm->allGatherOPPlan; + void* basePtr = (char*)sendbuff - rank * bytes; + bool inPlace = basePtr == recvbuff; + const size_t totalBytes = bytes * nRank; + for (const auto& p : plans) { + if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { + plan = p.plan; + break; + } } - if (plan == nullptr) return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); switch (datatype) { diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index e994548e4..5d76983e6 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -25,9 +25,15 @@ enum class PacketType { class ExecutionPlan { public: - ExecutionPlan(const std::string& name, const std::string& planPath); + ExecutionPlan(const std::string& planPath); ~ExecutionPlan() = default; + std::string name() const; + std::string collective() const; + size_t minMessageSize() const; + size_t maxMessageSize() const; + bool isInPlace() const; + private: struct Impl; std::shared_ptr impl_; diff --git a/python/mscclpp/executor_py.cpp b/python/mscclpp/executor_py.cpp index c550ecb00..d3add7194 100644 --- a/python/mscclpp/executor_py.cpp +++ b/python/mscclpp/executor_py.cpp @@ -22,7 +22,11 @@ void register_executor(nb::module_& m) { nb::enum_(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16); nb::class_(m, "ExecutionPlan") - .def(nb::init(), nb::arg("name"), nb::arg("planPath")); + .def(nb::init(), nb::arg("planPath")) + .def("name", &ExecutionPlan::name) + .def("collective", &ExecutionPlan::collective) + .def("min_message_size", &ExecutionPlan::minMessageSize) + .def("max_message_size", &ExecutionPlan::maxMessageSize); nb::class_(m, "Executor") .def(nb::init>(), nb::arg("comm")) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 5197b79b9..67e9929f1 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -59,7 +59,7 @@ def bench_time(n_iters: int, n_graph_iters: int, func): def bench_correctness( - execution_plan_name: str, + collective: str, input_buf: cp.ndarray, result_buf: cp.ndarray, test_buf: cp.ndarray, @@ -72,9 +72,9 @@ def bench_correctness( type_size = cp.dtype(parse_dtype(dtype_str)).itemsize fill_data_kernel_name = "fill_data_%s" % dtype_str - if "allgather" in execution_plan_name: + if "allgather" in collective: coll = "all_gather" - elif "reducescatter" in execution_plan_name: + elif "reducescatter" in collective: coll = "reduce_scatter" else: coll = "all_reduce" @@ -142,7 +142,7 @@ def allocate_buffer(nelems, dtype): def build_bufs( - execution_plan_name: str, + collective: str, size: int, in_place: bool, dtype: cp.dtype, @@ -153,7 +153,7 @@ def build_bufs( assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size) nelems = size // type_size - if "allgather" in execution_plan_name: + if "allgather" in collective: assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) nelems_input = nelems if in_place else nelems // num_ranks else: @@ -162,7 +162,7 @@ def build_bufs( result_buf = allocate_buffer(nelems_output, dtype=dtype) if in_place: - if "allgather" in execution_plan_name: + if "allgather" in collective: input_buf = cp.split(result_buf, num_ranks)[rank] else: input_buf = result_buf @@ -174,7 +174,6 @@ def build_bufs( def main( - execution_plan_name: str, execution_plan_path: str, size: int, in_place: bool = True, @@ -189,11 +188,12 @@ def main( npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan(execution_plan_name, execution_plan_path) + execution_plan = ExecutionPlan(execution_plan_path) + collective = execution_plan.collective() dtype = parse_dtype(dtype_str) input_buf, result_buf, test_buf = build_bufs( - execution_plan_name, + collective, size, in_place, dtype, @@ -215,7 +215,7 @@ def main( mscclpp_group.barrier() bench_correctness( - execution_plan_name, + collective, input_buf, result_buf, test_buf, @@ -242,7 +242,6 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("-n", "--execution_plan_name", type=str, required=True) parser.add_argument("-path", "--execution_plan_path", type=str, required=True) parser.add_argument("--size", type=str, required=True) parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") @@ -258,7 +257,6 @@ def main( buffer_size = parse_size(args.size) main( - args.execution_plan_name, args.execution_plan_path, buffer_size, args.in_place, diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 1be0b1821..3e7fa90da 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -607,7 +607,7 @@ def test_executor(mpi_group: MpiGroup, filename: str): npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename)) + execution_plan = ExecutionPlan(os.path.join(project_dir, "test", "execution-files", filename)) nelems = 1024 * 1024 cp.random.seed(42) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 20226b661..144fb4174 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -94,8 +94,15 @@ std::set groupChannelType{mscclpp::ChannelType::NVLS}; namespace mscclpp { using json = nlohmann::json; -ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath) - : name(name), planPath(planPath), isUsingPacket(false) {} +ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) { + std::ifstream file(this->planPath); + json obj = json::parse(file); + this->name = obj["name"]; + this->collective = obj["collective"]; + this->isInPlace = obj["inplace"]; + this->minMessageSize = obj.value("min_message_size", 0); + this->maxMessageSize = obj.value("max_message_size", std::numeric_limits::max()); +} std::vector ExecutionPlan::Impl::getChannelInfos(int rank, ChannelType channelType) const { auto pred = [channelType](const ChannelInfo& info) { return info.channelType == channelType; }; @@ -187,6 +194,7 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, if (this->name != obj["name"]) { throw Error("Plan name does not match", ErrorCode::ExecutorError); } + this->collective = obj["collective"]; std::string protocol = obj["protocol"]; if (protocol == "LL") { this->isUsingPacket = true; @@ -194,6 +202,9 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->inputSize = inputSize; this->outputSize = outputSize; this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024); + this->minMessageSize = obj.value("min_message_size", 0); + this->maxMessageSize = obj.value("max_message_size", std::numeric_limits::max()); + this->isInPlace = obj["inplace"]; const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { @@ -549,7 +560,16 @@ void ExecutionPlan::Impl::reset() { void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); } -ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath) - : impl_(std::make_shared(name, planPath)) {} +ExecutionPlan::ExecutionPlan(const std::string& planPath) : impl_(std::make_shared(planPath)) {} + +std::string ExecutionPlan::name() const { return this->impl_->name; } + +std::string ExecutionPlan::collective() const { return this->impl_->collective; } + +size_t ExecutionPlan::minMessageSize() const { return this->impl_->minMessageSize; } + +size_t ExecutionPlan::maxMessageSize() const { return this->impl_->maxMessageSize; } + +bool ExecutionPlan::isInPlace() const { return this->impl_->isInPlace; } } // namespace mscclpp diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 07292d748..8d291f45f 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -62,7 +62,7 @@ struct ChannelInfo { struct ExecutionPlan::Impl { public: - Impl(const std::string name, const std::string planPath); + Impl(const std::string planPath); ~Impl() = default; std::vector getChannelInfos(int rank, ChannelType channelType) const; @@ -85,7 +85,8 @@ struct ExecutionPlan::Impl { void reset(); void operationsReset(); - const std::string name; + std::string name; + std::string collective; const std::string planPath; bool isUsingPacket; // operations for [rank][threadblock] = [operations] @@ -106,6 +107,9 @@ struct ExecutionPlan::Impl { size_t inputSize; size_t outputSize; int nThreadsPerBlock; + size_t minMessageSize; + size_t maxMessageSize; + bool isInPlace; private: std::pair calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const; diff --git a/test/execution-files/allreduce.json b/test/execution-files/allreduce.json index afc921f4b..0a0a76590 100644 --- a/test/execution-files/allreduce.json +++ b/test/execution-files/allreduce.json @@ -1,6 +1,6 @@ { "name": "allreduce_pairs", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "Simple", "inplace": true, "num_threads_per_block": 512, diff --git a/test/execution-files/allreduce_nvls.json b/test/execution-files/allreduce_nvls.json index 069b5df9d..e882d0285 100644 --- a/test/execution-files/allreduce_nvls.json +++ b/test/execution-files/allreduce_nvls.json @@ -1,6 +1,6 @@ { "name": "allreduce_nvls", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "Simple", "inplace": true, "gpus": [ diff --git a/test/execution-files/allreduce_packet.json b/test/execution-files/allreduce_packet.json index d35a4e96b..545523447 100644 --- a/test/execution-files/allreduce_packet.json +++ b/test/execution-files/allreduce_packet.json @@ -1,6 +1,6 @@ { "name": "allreduce_pairs", - "colletive": "allreduce", + "collective": "allreduce", "protocol": "LL", "inplace": true, "num_threads_per_block": 768, diff --git a/test/execution-files/sendrecv.json b/test/execution-files/sendrecv.json index e84a06f7d..439d5f4f4 100644 --- a/test/execution-files/sendrecv.json +++ b/test/execution-files/sendrecv.json @@ -1,6 +1,6 @@ { "name": "send_recv", - "colletive": "sendrecv", + "collective": "sendrecv", "protocol": "Simple", "inplace": false, "gpus": [ diff --git a/test/execution-files/sendrecv_packet.json b/test/execution-files/sendrecv_packet.json index 3156b6191..5d63ad7c0 100644 --- a/test/execution-files/sendrecv_packet.json +++ b/test/execution-files/sendrecv_packet.json @@ -1,6 +1,6 @@ { "name": "send_recv", - "colletive": "sendrecv", + "collective": "sendrecv", "protocol": "LL", "inplace": false, "gpus": [ diff --git a/test/executor_test.cc b/test/executor_test.cc index e4ebcc972..68e8bfa32 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -89,9 +89,8 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 6 && argc != 7) { + if (argc != 5 && argc != 6) { std::cerr << "Usage: " << argv[0] << " " - << " " << " " << " " << " " @@ -107,14 +106,13 @@ int main(int argc, char* argv[]) { MSCCLPP_CUDATHROW(cudaSetDevice(rank)); const size_t bufferSize = parseSize(argv[1]); - const std::string executionPlanName = argv[2]; - const std::string executionPlanPath = argv[3]; - const int niters = std::stoi(argv[4]); - const int ngraphIters = std::stoi(argv[5]); + const std::string executionPlanPath = argv[2]; + const int niters = std::stoi(argv[3]); + const int ngraphIters = std::stoi(argv[4]); const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); mscclpp::PacketType packetType = mscclpp::PacketType::LL16; - if (argc == 7) { - packetType = parsePacketType(argv[6]); + if (argc == 6) { + packetType = parsePacketType(argv[5]); } std::shared_ptr bootstrap; @@ -130,7 +128,7 @@ int main(int argc, char* argv[]) { NpKit::Init(rank); } - mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath); + mscclpp::ExecutionPlan plan(executionPlanPath); std::shared_ptr sendbuff; if (mscclpp::isNvlsSupported()) { sendbuff = mscclpp::allocSharedPhysicalCuda(bufferSize); diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index 49952b6b4..116470dd1 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -55,7 +55,7 @@ TEST_F(ExecutorTest, TwoNodesAllreduce) { std::filesystem::path path = executablePath; std::filesystem::path executionFilesPath = path.parent_path().parent_path().parent_path() / "test/execution-files/allreduce.json"; - mscclpp::ExecutionPlan plan("allreduce_pairs", executionFilesPath.string()); + mscclpp::ExecutionPlan plan(executionFilesPath.string()); const int bufferSize = 1024 * 1024; std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking);