diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index f95d2aa66..74c95bac8 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -585,8 +585,9 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t std::shared_ptr plan; void* basePtr = (char*)sendbuff - rank * bytes; bool inPlace = basePtr == recvbuff; + const size_t totalBytes = bytes * nRank; for (const auto& p : plans) { - if (bytes * nRank >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { + if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) { plan = p.plan; break; } diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index 91261d16c..5d76983e6 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -25,7 +25,6 @@ enum class PacketType { class ExecutionPlan { public: - ExecutionPlan(const std::string& name, const std::string& planPath); ExecutionPlan(const std::string& planPath); ~ExecutionPlan() = default; diff --git a/python/mscclpp/executor_py.cpp b/python/mscclpp/executor_py.cpp index c550ecb00..d3add7194 100644 --- a/python/mscclpp/executor_py.cpp +++ b/python/mscclpp/executor_py.cpp @@ -22,7 +22,11 @@ void register_executor(nb::module_& m) { nb::enum_(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16); nb::class_(m, "ExecutionPlan") - .def(nb::init(), nb::arg("name"), nb::arg("planPath")); + .def(nb::init(), nb::arg("planPath")) + .def("name", &ExecutionPlan::name) + .def("collective", &ExecutionPlan::collective) + .def("min_message_size", &ExecutionPlan::minMessageSize) + .def("max_message_size", &ExecutionPlan::maxMessageSize); nb::class_(m, "Executor") .def(nb::init>(), nb::arg("comm")) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 5197b79b9..67e9929f1 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -59,7 +59,7 @@ def bench_time(n_iters: int, n_graph_iters: int, func): def bench_correctness( - execution_plan_name: str, + collective: str, input_buf: cp.ndarray, result_buf: cp.ndarray, test_buf: cp.ndarray, @@ -72,9 +72,9 @@ def bench_correctness( type_size = cp.dtype(parse_dtype(dtype_str)).itemsize fill_data_kernel_name = "fill_data_%s" % dtype_str - if "allgather" in execution_plan_name: + if "allgather" in collective: coll = "all_gather" - elif "reducescatter" in execution_plan_name: + elif "reducescatter" in collective: coll = "reduce_scatter" else: coll = "all_reduce" @@ -142,7 +142,7 @@ def allocate_buffer(nelems, dtype): def build_bufs( - execution_plan_name: str, + collective: str, size: int, in_place: bool, dtype: cp.dtype, @@ -153,7 +153,7 @@ def build_bufs( assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size) nelems = size // type_size - if "allgather" in execution_plan_name: + if "allgather" in collective: assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks) nelems_input = nelems if in_place else nelems // num_ranks else: @@ -162,7 +162,7 @@ def build_bufs( result_buf = allocate_buffer(nelems_output, dtype=dtype) if in_place: - if "allgather" in execution_plan_name: + if "allgather" in collective: input_buf = cp.split(result_buf, num_ranks)[rank] else: input_buf = result_buf @@ -174,7 +174,6 @@ def build_bufs( def main( - execution_plan_name: str, execution_plan_path: str, size: int, in_place: bool = True, @@ -189,11 +188,12 @@ def main( npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan(execution_plan_name, execution_plan_path) + execution_plan = ExecutionPlan(execution_plan_path) + collective = execution_plan.collective() dtype = parse_dtype(dtype_str) input_buf, result_buf, test_buf = build_bufs( - execution_plan_name, + collective, size, in_place, dtype, @@ -215,7 +215,7 @@ def main( mscclpp_group.barrier() bench_correctness( - execution_plan_name, + collective, input_buf, result_buf, test_buf, @@ -242,7 +242,6 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("-n", "--execution_plan_name", type=str, required=True) parser.add_argument("-path", "--execution_plan_path", type=str, required=True) parser.add_argument("--size", type=str, required=True) parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") @@ -258,7 +257,6 @@ def main( buffer_size = parse_size(args.size) main( - args.execution_plan_name, args.execution_plan_path, buffer_size, args.in_place, diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index b5bc4bc99..144fb4174 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -94,9 +94,6 @@ std::set groupChannelType{mscclpp::ChannelType::NVLS}; namespace mscclpp { using json = nlohmann::json; -ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath) - : name(name), planPath(planPath), isUsingPacket(false) {} - ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) { std::ifstream file(this->planPath); json obj = json::parse(file); @@ -563,9 +560,6 @@ void ExecutionPlan::Impl::reset() { void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); } -ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath) - : impl_(std::make_shared(name, planPath)) {} - ExecutionPlan::ExecutionPlan(const std::string& planPath) : impl_(std::make_shared(planPath)) {} std::string ExecutionPlan::name() const { return this->impl_->name; } diff --git a/test/executor_test.cc b/test/executor_test.cc index e4ebcc972..68e8bfa32 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -89,9 +89,8 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 6 && argc != 7) { + if (argc != 5 && argc != 6) { std::cerr << "Usage: " << argv[0] << " " - << " " << " " << " " << " " @@ -107,14 +106,13 @@ int main(int argc, char* argv[]) { MSCCLPP_CUDATHROW(cudaSetDevice(rank)); const size_t bufferSize = parseSize(argv[1]); - const std::string executionPlanName = argv[2]; - const std::string executionPlanPath = argv[3]; - const int niters = std::stoi(argv[4]); - const int ngraphIters = std::stoi(argv[5]); + const std::string executionPlanPath = argv[2]; + const int niters = std::stoi(argv[3]); + const int ngraphIters = std::stoi(argv[4]); const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); mscclpp::PacketType packetType = mscclpp::PacketType::LL16; - if (argc == 7) { - packetType = parsePacketType(argv[6]); + if (argc == 6) { + packetType = parsePacketType(argv[5]); } std::shared_ptr bootstrap; @@ -130,7 +128,7 @@ int main(int argc, char* argv[]) { NpKit::Init(rank); } - mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath); + mscclpp::ExecutionPlan plan(executionPlanPath); std::shared_ptr sendbuff; if (mscclpp::isNvlsSupported()) { sendbuff = mscclpp::allocSharedPhysicalCuda(bufferSize); diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index 49952b6b4..116470dd1 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -55,7 +55,7 @@ TEST_F(ExecutorTest, TwoNodesAllreduce) { std::filesystem::path path = executablePath; std::filesystem::path executionFilesPath = path.parent_path().parent_path().parent_path() / "test/execution-files/allreduce.json"; - mscclpp::ExecutionPlan plan("allreduce_pairs", executionFilesPath.string()); + mscclpp::ExecutionPlan plan(executionFilesPath.string()); const int bufferSize = 1024 * 1024; std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking);