From 93628d20665bc9f827a005684886c11b14d97225 Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Sat, 23 Nov 2024 12:15:56 -0800 Subject: [PATCH 1/4] Fixing Message Boundary AllReduce Fallback Code (#391) --- apps/nccl/src/nccl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index a414ffe85..2bd2a4422 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -216,7 +216,7 @@ static ncclResult_t ncclAllReduceFallback(const void* sendbuff, void* recvbuff, mscclpp::DeviceHandle* smOutChannels = nullptr; // Creating the channels - if (count * ncclTypeSize(datatype) <= comm->largeMessageSizeBoundary) { + if (count * ncclTypeSize(datatype) <= (1 << 20)) { auto sendIt = comm->channelScratchInfos.find(sendKey); if (sendIt == comm->channelScratchInfos.end()) { std::vector channels = From 1b8d0206500936b8f34cdd2a0588a629b95c6ebf Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 25 Nov 2024 11:59:51 -0800 Subject: [PATCH 2/4] Fix mscclpp_benchmark (#392) Enable 1GB message size for NVLS transport in mscclpp_benchmark --- python/mscclpp/comm.py | 2 +- python/mscclpp_benchmark/allreduce_bench.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/mscclpp/comm.py b/python/mscclpp/comm.py index ca3620924..f370b9441 100644 --- a/python/mscclpp/comm.py +++ b/python/mscclpp/comm.py @@ -99,7 +99,7 @@ def make_connection( else: endpoint = endpoints if endpoint.transport == Transport.Nvls: - return connect_nvls_collective(self.communicator, all_ranks) + return connect_nvls_collective(self.communicator, all_ranks, 2**30) else: connections[rank] = self.communicator.connect_on_setup(rank, 0, endpoint) self.communicator.setup() diff --git a/python/mscclpp_benchmark/allreduce_bench.py b/python/mscclpp_benchmark/allreduce_bench.py index 69e4f3adc..e93c0479e 100644 --- a/python/mscclpp_benchmark/allreduce_bench.py +++ b/python/mscclpp_benchmark/allreduce_bench.py @@ -289,7 +289,7 @@ def get_netinterface_info(): mscclpp_algbw = [] nccl_algbw = [] speed_ups = [] - end_range = 28 if is_nvls_supported() else 29 + end_range = 29 for i in range(10, end_range): if MPI.COMM_WORLD.size // N_GPUS_PER_NODE == 1: nelems = 2**i From 593478e1b74d808bd2c43446ad64729b363472c3 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 25 Nov 2024 21:13:30 -0800 Subject: [PATCH 3/4] Add cross threadblock barrier (#383) --- include/mscclpp/npkit/npkit_event.hpp | 2 +- src/executor/execution_plan.cc | 8 ++++++++ src/include/execution_common.hpp | 20 +++++++++++++++----- src/include/execution_kernel.hpp | 10 +++++++++- test/executor_test.cc | 11 ++++++----- tools/npkit/npkit_trace_generator.py | 1 + 6 files changed, 40 insertions(+), 12 deletions(-) diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp index 1a24b241f..cb1925626 100644 --- a/include/mscclpp/npkit/npkit_event.hpp +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -13,6 +13,6 @@ #define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 #define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x17 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x18 #endif diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 49ceddf0a..20226b661 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -17,6 +17,8 @@ std::vector filter(const std::vector& vec, Predicate pred) { auto getOpType = [](const std::string& str) { if (str == "nop") { + return mscclpp::OperationType::NOP; + } else if (str == "barrier") { return mscclpp::OperationType::BARRIER; } else if (str == "put") { return mscclpp::OperationType::PUT; @@ -456,6 +458,12 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse operation.size = this->getNChunkSize(rank, this->inputSize, this->outputSize, (uint32_t)op["cnt"], chunkIndexes); } + if (op.contains("barrier_id")) { + operation.deviceSyncerIndex = op["barrier_id"]; + } + if (op.contains("nthread_blocks")) { + operation.nThreadBlocks = op["nthread_blocks"]; + } ops.push_back(operation); } this->operations[rank].push_back(ops); diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index f4f4fbd8c..d0d0dc30d 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -30,6 +30,7 @@ enum class ChannelType : uint8_t { // NOTE(chhwang): any modification here requires corresponding updates in `tools/npkit/npkit_trace_generator.py`. enum class OperationType : uint8_t { + NOP, BARRIER, PUT, PUT_PACKET, @@ -78,11 +79,20 @@ struct Operation { BufferType outputBufferType; uint8_t nvlsOutputIndex; }; - uint32_t inputOffsets[MAX_CHANNEL_PER_OPERATION]; - uint32_t outputOffsets[MAX_CHANNEL_PER_OPERATION]; - uint32_t srcOffset; - uint32_t dstOffset; - uint32_t size; + union { + // For Barrier operation + struct { + uint32_t deviceSyncerIndex; + uint32_t nThreadBlocks; + }; + struct { + uint32_t inputOffsets[MAX_CHANNEL_PER_OPERATION]; + uint32_t outputOffsets[MAX_CHANNEL_PER_OPERATION]; + uint32_t srcOffset; + uint32_t dstOffset; + uint32_t size; + }; + }; }; // total size = 2304 + 6400 + 4 + 12(padding) = 8720 bytes diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 1e9d6ac57..1b0490f91 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -8,6 +8,7 @@ #if defined(ENABLE_NPKIT) #include #endif +#include #include #include #include @@ -172,6 +173,9 @@ struct VectorType { namespace mscclpp { +#define MAX_DEVICE_SYNCERS 16 +__device__ DeviceSyncer deviceSyncers[MAX_DEVICE_SYNCERS]; + #if defined(MSCCLPP_DEVICE_COMPILE) template @@ -526,8 +530,12 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu event_buffer, &event_buffer_head); #endif - if (op.type == OperationType::BARRIER) { + if (op.type == OperationType::NOP) { __syncthreads(); + } else if (op.type == OperationType::BARRIER) { + int nThreadBlocks = op.nThreadBlocks; + int syncStateIndex = op.deviceSyncerIndex; + deviceSyncers[syncStateIndex].sync(nThreadBlocks); } else if (op.type == OperationType::SIGNAL) { handleSignal(smChannels, proxyChannels, op.outputChannelIndexes, op.nOutputs, op.channelType); } else if (op.type == OperationType::WAIT) { diff --git a/test/executor_test.cc b/test/executor_test.cc index 3fc0b1e21..e4ebcc972 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -131,11 +131,12 @@ int main(int argc, char* argv[]) { } mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath); -#if (CUDA_NVLS_SUPPORTED) - std::shared_ptr sendbuff = mscclpp::allocSharedPhysicalCuda(bufferSize); -#else - std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); -#endif + std::shared_ptr sendbuff; + if (mscclpp::isNvlsSupported()) { + sendbuff = mscclpp::allocSharedPhysicalCuda(bufferSize); + } else { + sendbuff = mscclpp::allocExtSharedCuda(bufferSize); + } std::vector dataHost(bufferSize / sizeof(int), rank); MSCCLPP_CUDATHROW(cudaMemcpy(sendbuff.get(), dataHost.data(), bufferSize, cudaMemcpyHostToDevice)); double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, niters, ngraphIters, packetType); diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 9a5b88b44..31c2e1622 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -11,6 +11,7 @@ def parse_npkit_event_header(npkit_event_header_path): npkit_event_def = {"id_to_type": {}, "type_to_id": {}} executor_ops = [ + "NOP", "BARRIER", "PUT", "PUT_PACKET", From d9c297ba14dd219875dc4e691899c3668c86efbe Mon Sep 17 00:00:00 2001 From: Caio Rocha <164253795+caiomcbr@users.noreply.github.com> Date: Wed, 27 Nov 2024 17:05:51 -0800 Subject: [PATCH 4/4] AllGather Executor Support in NCCL Interface (#393) Co-authored-by: Ziyue Yang Co-authored-by: Changho Hwang Co-authored-by: Binyang Li --- apps/nccl/src/nccl.cu | 138 +++++++++++++++++++++++++++++++----------- 1 file changed, 104 insertions(+), 34 deletions(-) diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 2bd2a4422..2b7e97360 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -58,7 +58,7 @@ struct ncclComm { std::vector> smSemaphores; std::shared_ptr executor; std::shared_ptr allReducePacketIPPlan, allReducePacketOPPlan, allReduceIPPlan, - allReduceOPPlan; + allReduceOPPlan, allGatherIPPlan, allGatherOPPlan, allGatherPacketIPPlan, allGatherPacketOPPlan; std::unordered_map channelInInfos; std::unordered_map channelOutInfos; @@ -66,7 +66,8 @@ struct ncclComm { std::shared_ptr scratchBuff; std::vector remoteScratchRegMemories; - size_t smallMessageSizeBoundary, largeMessageSizeBoundary; + size_t allReduceSmallMessageSizeBoundary, allReduceLargeMessageSizeBoundary; + size_t allGatherSmallMessageSizeBoundary, allGatherLargeMessageSizeBoundary; uint32_t numScratchBuff; uint32_t buffFlag; }; @@ -279,6 +280,46 @@ static ncclResult_t ncclAllReduceFallback(const void* sendbuff, void* recvbuff, return ncclSuccess; } +static ncclResult_t ncclAllGatherFallback(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, ncclComm_t comm, cudaStream_t stream) { + size_t bytes = sendcount * ncclTypeSize(datatype); + if (sendbuff == nullptr || recvbuff == nullptr || bytes == 0 || comm == nullptr) return ncclInvalidArgument; + + // Declarating variables + size_t recvBytes; + CUdeviceptr recvBasePtr; + MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); + size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; + channelKey recvKey{(void*)recvBasePtr, recvBytes}; + int rank = comm->comm->bootstrap()->getRank(); + int nRank = comm->comm->bootstrap()->getNranks(); + mscclpp::DeviceHandle* smChannels = nullptr; + + auto it = comm->channelOutInfos.find(recvKey); + if (it == comm->channelOutInfos.end()) { + std::vector remoteMemories = setupRemoteMemories( + comm->comm, rank, const_cast((void*)recvBasePtr), recvBytes, mscclpp::Transport::CudaIpc); + std::vector channels = + setupSmChannels(comm, remoteMemories, const_cast((void*)recvBasePtr)); + std::vector> smChannelDeviceHandles; + std::transform(channels.begin(), channels.end(), std::back_inserter(smChannelDeviceHandles), + [](const mscclpp::SmChannel& smChannel) { return mscclpp::deviceHandle(smChannel); }); + ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; + it = comm->channelOutInfos.emplace(recvKey, channelInfo).first; + } + + smChannels = it->second.smChannelDeviceHandles.get(); + if ((char*)sendbuff == (char*)recvbuff + rank * sendcount) { + CUDACHECK(allgather((int*)sendbuff, (int*)nullptr, (int*)recvbuff, smChannels, offsetOut, rank, + NRANKS_PER_NODE, nRank, bytes / sizeof(int), stream)); + } else { + CUDACHECK(allgather((int*)sendbuff, (int*)nullptr, (int*)recvbuff, smChannels, offsetOut, rank, + NRANKS_PER_NODE, nRank, bytes / sizeof(int), stream)); + } + + return ncclSuccess; +} + NCCL_API ncclResult_t ncclGetVersion(int* version) { if (version == nullptr) return ncclInvalidArgument; *version = MSCCLPP_VERSION; @@ -355,15 +396,39 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI commPtr->allReduceOPPlan = std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_OP_JSON_FILE"))); if (getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")) - commPtr->smallMessageSizeBoundary = parseSize(getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")); + commPtr->allReduceSmallMessageSizeBoundary = parseSize(getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")); else - commPtr->smallMessageSizeBoundary = 16 * (1 << 10); + commPtr->allReduceSmallMessageSizeBoundary = 16 * (1 << 10); if (getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")) - commPtr->largeMessageSizeBoundary = parseSize(getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")); + commPtr->allReduceLargeMessageSizeBoundary = parseSize(getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")); + else + commPtr->allReduceLargeMessageSizeBoundary = 1 << 20; + + if (getenv("ALLGATHERPKT_IP_JSON_FILE")) + commPtr->allGatherPacketIPPlan = std::make_shared( + mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_IP_JSON_FILE"))); + if (getenv("ALLGATHERPKT_OP_JSON_FILE")) + commPtr->allGatherPacketOPPlan = std::make_shared( + mscclpp::ExecutionPlan("allgather_pkt", getenv("ALLGATHERPKT_OP_JSON_FILE"))); + if (getenv("ALLGATHER_IP_JSON_FILE")) + commPtr->allGatherIPPlan = + std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_IP_JSON_FILE"))); + if (getenv("ALLGATHER_OP_JSON_FILE")) + commPtr->allGatherOPPlan = + std::make_shared(mscclpp::ExecutionPlan("allgather", getenv("ALLGATHER_OP_JSON_FILE"))); + if (getenv("ALLGATHER_SMALL_MSG_BOUNDARY")) + commPtr->allGatherSmallMessageSizeBoundary = parseSize(getenv("ALLGATHER_SMALL_MSG_BOUNDARY")); else - commPtr->largeMessageSizeBoundary = 1 << 20; + commPtr->allGatherSmallMessageSizeBoundary = (1 << 10); + if (getenv("ALLGATHER_LARGE_MSG_BOUNDARY")) + commPtr->allGatherLargeMessageSizeBoundary = parseSize(getenv("ALLGATHER_LARGE_MSG_BOUNDARY")); + else + commPtr->allGatherLargeMessageSizeBoundary = 1 << 20; - if (commPtr->smallMessageSizeBoundary > commPtr->largeMessageSizeBoundary) return ncclInvalidArgument; + if (commPtr->allReduceSmallMessageSizeBoundary > commPtr->allReduceLargeMessageSizeBoundary) + return ncclInvalidArgument; + if (commPtr->allGatherSmallMessageSizeBoundary > commPtr->allGatherLargeMessageSizeBoundary) + return ncclInvalidArgument; *comm = commPtr; return ncclSuccess; @@ -483,11 +548,11 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t size_t bytes = count * ncclTypeSize(datatype); int rank = comm->comm->bootstrap()->getRank(); - if (bytes < comm->smallMessageSizeBoundary) { + if (bytes < comm->allReduceSmallMessageSizeBoundary) { return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); } else { std::shared_ptr plan; - if (bytes <= comm->largeMessageSizeBoundary) + if (bytes <= comm->allReduceLargeMessageSizeBoundary) plan = (sendbuff == recvbuff) ? comm->allReducePacketIPPlan : comm->allReducePacketOPPlan; else { plan = (sendbuff == recvbuff) ? comm->allReduceIPPlan : comm->allReduceOPPlan; @@ -533,36 +598,41 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t size_t bytes = sendcount * ncclTypeSize(datatype); if (sendbuff == nullptr || recvbuff == nullptr || bytes == 0 || comm == nullptr) return ncclInvalidArgument; - // Declarating variables - size_t recvBytes; - CUdeviceptr recvBasePtr; - MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); - size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; - channelKey recvKey{(void*)recvBasePtr, recvBytes}; int rank = comm->comm->bootstrap()->getRank(); int nRank = comm->comm->bootstrap()->getNranks(); - mscclpp::DeviceHandle* smChannels = nullptr; - auto it = comm->channelOutInfos.find(recvKey); - if (it == comm->channelOutInfos.end()) { - std::vector remoteMemories = setupRemoteMemories( - comm->comm, rank, const_cast((void*)recvBasePtr), recvBytes, mscclpp::Transport::CudaIpc); - std::vector channels = - setupSmChannels(comm, remoteMemories, const_cast((void*)recvBasePtr)); - std::vector> smChannelDeviceHandles; - std::transform(channels.begin(), channels.end(), std::back_inserter(smChannelDeviceHandles), - [](const mscclpp::SmChannel& smChannel) { return mscclpp::deviceHandle(smChannel); }); - ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; - it = comm->channelOutInfos.emplace(recvKey, channelInfo).first; + if (bytes * nRank < comm->allGatherSmallMessageSizeBoundary) + return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); + + std::shared_ptr plan; + if (bytes * nRank <= comm->allGatherLargeMessageSizeBoundary) + plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherPacketIPPlan : comm->allGatherPacketOPPlan; + else { + plan = (sendbuff == (char*)recvbuff + rank * bytes) ? comm->allGatherIPPlan : comm->allGatherOPPlan; } - smChannels = it->second.smChannelDeviceHandles.get(); - if ((char*)sendbuff == (char*)recvbuff + rank * sendcount) { - CUDACHECK(allgather((int*)sendbuff, (int*)nullptr, (int*)recvbuff, smChannels, offsetOut, rank, - NRANKS_PER_NODE, nRank, bytes / sizeof(int), stream)); - } else { - CUDACHECK(allgather((int*)sendbuff, (int*)nullptr, (int*)recvbuff, smChannels, offsetOut, rank, - NRANKS_PER_NODE, nRank, bytes / sizeof(int), stream)); + if (plan == nullptr) return ncclAllGatherFallback(sendbuff, recvbuff, sendcount, datatype, comm, stream); + + switch (datatype) { + case ncclFloat16: + comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes * nRank, mscclpp::DataType::FLOAT16, + *plan, stream); + break; + case ncclFloat32: + comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes * nRank, + mscclpp::DataType::FLOAT32, *plan, stream); + break; + case ncclBfloat16: + comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes * nRank, + mscclpp::DataType::BFLOAT16, *plan, stream); + break; + case ncclInt32: + case ncclUint32: + comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes * nRank, mscclpp::DataType::UINT32, + *plan, stream); + break; + default: + return ncclInvalidArgument; } return ncclSuccess;