Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Binyang2014 committed Dec 2, 2024
1 parent 4d29958 commit bb25a03
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 31 deletions.
3 changes: 2 additions & 1 deletion apps/nccl/src/nccl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,9 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t
std::shared_ptr<mscclpp::ExecutionPlan> plan;
void* basePtr = (char*)sendbuff - rank * bytes;
bool inPlace = basePtr == recvbuff;
const size_t totalBytes = bytes * nRank;
for (const auto& p : plans) {
if (bytes * nRank >= p.key.minMessageSize && bytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
if (totalBytes >= p.key.minMessageSize && totalBytes < p.key.maxMessageSize && inPlace == p.key.isInPlace) {
plan = p.plan;
break;
}
Expand Down
1 change: 0 additions & 1 deletion include/mscclpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ enum class PacketType {

class ExecutionPlan {
public:
ExecutionPlan(const std::string& name, const std::string& planPath);
ExecutionPlan(const std::string& planPath);
~ExecutionPlan() = default;

Expand Down
6 changes: 5 additions & 1 deletion python/mscclpp/executor_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ void register_executor(nb::module_& m) {
nb::enum_<PacketType>(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16);

nb::class_<ExecutionPlan>(m, "ExecutionPlan")
.def(nb::init<const std::string, const std::string>(), nb::arg("name"), nb::arg("planPath"));
.def(nb::init<const std::string>(), nb::arg("planPath"))
.def("name", &ExecutionPlan::name)
.def("collective", &ExecutionPlan::collective)
.def("min_message_size", &ExecutionPlan::minMessageSize)
.def("max_message_size", &ExecutionPlan::maxMessageSize);

nb::class_<Executor>(m, "Executor")
.def(nb::init<std::shared_ptr<Communicator>>(), nb::arg("comm"))
Expand Down
22 changes: 10 additions & 12 deletions python/test/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def bench_time(n_iters: int, n_graph_iters: int, func):


def bench_correctness(
execution_plan_name: str,
collective: str,
input_buf: cp.ndarray,
result_buf: cp.ndarray,
test_buf: cp.ndarray,
Expand All @@ -72,9 +72,9 @@ def bench_correctness(
type_size = cp.dtype(parse_dtype(dtype_str)).itemsize

fill_data_kernel_name = "fill_data_%s" % dtype_str
if "allgather" in execution_plan_name:
if "allgather" in collective:
coll = "all_gather"
elif "reducescatter" in execution_plan_name:
elif "reducescatter" in collective:
coll = "reduce_scatter"
else:
coll = "all_reduce"
Expand Down Expand Up @@ -142,7 +142,7 @@ def allocate_buffer(nelems, dtype):


def build_bufs(
execution_plan_name: str,
collective: str,
size: int,
in_place: bool,
dtype: cp.dtype,
Expand All @@ -153,7 +153,7 @@ def build_bufs(
assert (size % type_size) == 0, "size %d not multiple of type size %d" % (size, type_size)
nelems = size // type_size

if "allgather" in execution_plan_name:
if "allgather" in collective:
assert (nelems % num_ranks) == 0, "nelems %d not multiple of num_ranks %d" % (nelems, num_ranks)
nelems_input = nelems if in_place else nelems // num_ranks
else:
Expand All @@ -162,7 +162,7 @@ def build_bufs(

result_buf = allocate_buffer(nelems_output, dtype=dtype)
if in_place:
if "allgather" in execution_plan_name:
if "allgather" in collective:
input_buf = cp.split(result_buf, num_ranks)[rank]
else:
input_buf = result_buf
Expand All @@ -174,7 +174,6 @@ def build_bufs(


def main(
execution_plan_name: str,
execution_plan_path: str,
size: int,
in_place: bool = True,
Expand All @@ -189,11 +188,12 @@ def main(
npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR")
if npkit_dump_dir is not None:
npkit.init(mscclpp_group.my_rank)
execution_plan = ExecutionPlan(execution_plan_name, execution_plan_path)
execution_plan = ExecutionPlan(execution_plan_path)
collective = execution_plan.collective()

dtype = parse_dtype(dtype_str)
input_buf, result_buf, test_buf = build_bufs(
execution_plan_name,
collective,
size,
in_place,
dtype,
Expand All @@ -215,7 +215,7 @@ def main(

mscclpp_group.barrier()
bench_correctness(
execution_plan_name,
collective,
input_buf,
result_buf,
test_buf,
Expand All @@ -242,7 +242,6 @@ def main(

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--execution_plan_name", type=str, required=True)
parser.add_argument("-path", "--execution_plan_path", type=str, required=True)
parser.add_argument("--size", type=str, required=True)
parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation")
Expand All @@ -258,7 +257,6 @@ def main(

buffer_size = parse_size(args.size)
main(
args.execution_plan_name,
args.execution_plan_path,
buffer_size,
args.in_place,
Expand Down
6 changes: 0 additions & 6 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ std::set groupChannelType{mscclpp::ChannelType::NVLS};
namespace mscclpp {
using json = nlohmann::json;

ExecutionPlan::Impl::Impl(const std::string name, const std::string planPath)
: name(name), planPath(planPath), isUsingPacket(false) {}

ExecutionPlan::Impl::Impl(const std::string planPath) : planPath(planPath), isUsingPacket(false) {
std::ifstream file(this->planPath);
json obj = json::parse(file);
Expand Down Expand Up @@ -563,9 +560,6 @@ void ExecutionPlan::Impl::reset() {

void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); }

ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath)
: impl_(std::make_shared<Impl>(name, planPath)) {}

ExecutionPlan::ExecutionPlan(const std::string& planPath) : impl_(std::make_shared<Impl>(planPath)) {}

std::string ExecutionPlan::name() const { return this->impl_->name; }
Expand Down
16 changes: 7 additions & 9 deletions test/executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ double benchTime(int rank, std::shared_ptr<mscclpp::Bootstrap> bootstrap, std::s
}

int main(int argc, char* argv[]) {
if (argc != 6 && argc != 7) {
if (argc != 5 && argc != 6) {
std::cerr << "Usage: " << argv[0] << " <buffer size>"
<< " <execution plan name>"
<< " <execution plan path>"
<< " <number of iterations>"
<< " <number of graph iterations>"
Expand All @@ -107,14 +106,13 @@ int main(int argc, char* argv[]) {
MSCCLPP_CUDATHROW(cudaSetDevice(rank));

const size_t bufferSize = parseSize(argv[1]);
const std::string executionPlanName = argv[2];
const std::string executionPlanPath = argv[3];
const int niters = std::stoi(argv[4]);
const int ngraphIters = std::stoi(argv[5]);
const std::string executionPlanPath = argv[2];
const int niters = std::stoi(argv[3]);
const int ngraphIters = std::stoi(argv[4]);
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
mscclpp::PacketType packetType = mscclpp::PacketType::LL16;
if (argc == 7) {
packetType = parsePacketType(argv[6]);
if (argc == 6) {
packetType = parsePacketType(argv[5]);
}

std::shared_ptr<mscclpp::TcpBootstrap> bootstrap;
Expand All @@ -130,7 +128,7 @@ int main(int argc, char* argv[]) {
NpKit::Init(rank);
}

mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath);
mscclpp::ExecutionPlan plan(executionPlanPath);
std::shared_ptr<char> sendbuff;
if (mscclpp::isNvlsSupported()) {
sendbuff = mscclpp::allocSharedPhysicalCuda<char>(bufferSize);
Expand Down
2 changes: 1 addition & 1 deletion test/mp_unit/executor_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST_F(ExecutorTest, TwoNodesAllreduce) {
std::filesystem::path path = executablePath;
std::filesystem::path executionFilesPath =
path.parent_path().parent_path().parent_path() / "test/execution-files/allreduce.json";
mscclpp::ExecutionPlan plan("allreduce_pairs", executionFilesPath.string());
mscclpp::ExecutionPlan plan(executionFilesPath.string());
const int bufferSize = 1024 * 1024;
std::shared_ptr<char> sendbuff = mscclpp::allocExtSharedCuda<char>(bufferSize);
mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking);
Expand Down

0 comments on commit bb25a03

Please sign in to comment.