From 756f24c697a3469e100bd2ae10f9b53b64a1a1c2 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Fri, 6 Dec 2024 10:53:34 -0800 Subject: [PATCH 1/2] Revised ProxyChannel interfaces (#400) * Renamed `ProxyChannel` -> `BaseProxyChannel` and `SimpleProxyChannel` -> `ProxyChannel`. It makes the interface more consistent by defining channels to be associated with a certain src/dst memory region: `ProxyChannel` as "sema + src/dst + fifo" and `SmChannel` as "sema + src/dst". BaseProxyChannel is not associated with any memory regions, as "sema + fifo". * `ProxyChannelDeviceHandle` now inherits from `BaseProxyChannelDeviceHandle`, instead of having one as a member. --- README.md | 2 +- docs/design/design.md | 6 +- .../tutorials/initialization.md | 10 ++-- docs/getting-started/tutorials/python-api.md | 4 +- include/mscclpp/proxy_channel.hpp | 59 +++++++++++-------- include/mscclpp/proxy_channel_device.hpp | 37 ++++++------ python/mscclpp/__init__.py | 2 +- python/mscclpp/comm.py | 12 ++-- python/mscclpp/proxy_channel_py.cpp | 35 ++++++----- python/mscclpp_benchmark/allreduce.cu | 36 +++++------ ..._channel_test.cu => proxy_channel_test.cu} | 4 +- python/test/test_mscclpp.py | 16 ++--- src/executor/executor.cc | 8 +-- src/include/execution_common.hpp | 2 +- src/include/execution_kernel.hpp | 23 ++++---- src/proxy_channel.cc | 28 +++++---- src/semaphore.cc | 2 +- test/allgather_test_cpp.cu | 27 ++++----- test/mp_unit/mp_unit_tests.hpp | 2 +- test/mp_unit/proxy_channel_tests.cu | 40 ++++++------- test/mscclpp-test/allgather_test.cu | 32 +++++----- test/mscclpp-test/allreduce_test.cu | 54 ++++++++--------- test/mscclpp-test/alltoall_test.cu | 12 ++-- test/mscclpp-test/common.cc | 16 ++--- test/mscclpp-test/common.hpp | 4 +- 25 files changed, 239 insertions(+), 234 deletions(-) rename python/test/{simple_proxy_channel_test.cu => proxy_channel_test.cu} (87%) diff --git a/README.md b/README.md index 8e5e750d3..bd4cb644a 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ MSCCL++ provides peer-to-peer communication methods between GPUs. A peer-to-peer ```cpp // `ProxyChannel` will be explained in the following section. -__device__ mscclpp::DeviceHandle channel; +__device__ mscclpp::DeviceHandle channel; __global__ void gpuKernel() { ... // Only one thread is needed for this method. diff --git a/docs/design/design.md b/docs/design/design.md index 82b6e0965..c67e4d62a 100644 --- a/docs/design/design.md +++ b/docs/design/design.md @@ -117,7 +117,7 @@ In this section, we will discuss several use cases that demonstrate the capabili MSCCL++ enables the offloading of communication logic from the GPU to the CPU, facilitating the overlapping of communication and computation processes. The code snippet provided illustrates this overlapping technique. In the depicted scenario, the GPU emits a signal to the CPU indicating readiness for data transfer. Subsequently, while the GPU continues to execute computation tasks, the CPU initiates the data transfer to the designated target device. ```cpp -__device__ void gpuKernel(mscclpp::SimpleProxyChannelDeviceHandle* proxyChannel) { +__device__ void gpuKernel(mscclpp::ProxyChannelDeviceHandle* proxyChannel) { int tid = threadIdx.x + blockIdx.x * blockDim.x; // Send a trigger to the CPU if (tid == 0) { @@ -138,11 +138,11 @@ Traditional communication libraries enforce a separation between communication a MCSCL++ offers a low-level communication API, allowing users to design customized collective communication algorithms. The following code demonstrates how to implement a customized All2All algorithm using MSCCL++. ```cpp using DeviceHandle = mscclpp::DeviceHandle; -__device__ void localAlltoall(DeviceHandle* proxyChans, int rank, +__device__ void localAlltoall(DeviceHandle* proxyChans, int rank, int nRanksPerNode, size_t nElements) { int remoteRank = ((int)blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; for (int i = 1; i < nRanksPerNode; i++) { - DeviceHandle proxyChan = proxyChans[blockIdx.x]; + DeviceHandle proxyChan = proxyChans[blockIdx.x]; if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) { proxyChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), nElements * sizeof(int)); diff --git a/docs/getting-started/tutorials/initialization.md b/docs/getting-started/tutorials/initialization.md index 0bdd8ad45..b1d4c1d26 100644 --- a/docs/getting-started/tutorials/initialization.md +++ b/docs/getting-started/tutorials/initialization.md @@ -21,7 +21,7 @@ We will setup a mesh topology with eight GPUs. Each GPU will be connected to its template using DeviceHandle = mscclpp::DeviceHandle; -__constant__ DeviceHandle constProxyChans[8]; +__constant__ DeviceHandle constProxyChans[8]; void setupMeshTopology(int rank, int worldsize, void* data, size_t dataSize) { std::string ip_port = "10.0.0.4:50000"; @@ -55,17 +55,17 @@ void setupMeshTopology(int rank, int worldsize, void* data, size_t dataSize) { comm.setup(); - std::vector> proxyChannels; + std::vector> proxyChannels; for (size_t i = 0; i < semaphoreIds.size(); ++i) { - proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( + proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::ProxyChannel( proxyService.proxyChannel(semaphoreIds[i]), proxyService.addMemory(remoteMemories[i].get()), proxyService.addMemory(localMemories[i])))); } - if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDACHECK(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + sizeof(DeviceHandle) * proxyChannels.size())); } ``` diff --git a/docs/getting-started/tutorials/python-api.md b/docs/getting-started/tutorials/python-api.md index 9e6c5627b..fcc7eee2f 100644 --- a/docs/getting-started/tutorials/python-api.md +++ b/docs/getting-started/tutorials/python-api.md @@ -47,7 +47,7 @@ We provide some Python utils to help you launch kernel via python. Here is a exa ```python from mscclpp.utils import KernelBuilder, pack -def launch_kernel(my_rank: int, nranks: int, simple_channels: List[SimpleProxyChannel], memory: cp.ndarray): +def launch_kernel(my_rank: int, nranks: int, simple_channels: List[ProxyChannel], memory: cp.ndarray): file_dir = os.path.dirname(os.path.abspath(__file__)) kernel = KernelBuilder(file="test.cu", kernel_name="test", file_dir=file_dir).get_compiled_kernel() params = b"" @@ -77,7 +77,7 @@ The test kernel is defined in `test.cu` as follows: // be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing extern "C" __global__ void __launch_bounds__(1024, 1) - simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks, + proxy_channel(mscclpp::ProxyChannelDeviceHandle* channels, int my_rank, int nranks, int num_elements) { int tid = threadIdx.x; int nthreads = blockDim.x; diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 12612e7d0..91c67a2dd 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -11,6 +11,7 @@ namespace mscclpp { +struct BaseProxyChannel; struct ProxyChannel; /// Base class for proxy services. Proxy services are used to proxy data between devices. @@ -48,10 +49,17 @@ class ProxyService : public BaseProxyService { /// @return The semaphore. std::shared_ptr semaphore(SemaphoreId id) const; - /// Get a proxy channel by semaphore ID. + /// Get a base proxy channel by semaphore ID. /// @param id The ID of the semaphore. + /// @return The base proxy channel. + BaseProxyChannel baseProxyChannel(SemaphoreId id); + + /// Get a proxy channel by semaphore ID and memory regions. + /// @param id The ID of the semaphore. + /// @param dst The destination memory region. + /// @param src The source memory region. /// @return The proxy channel. - ProxyChannel proxyChannel(SemaphoreId id); + ProxyChannel proxyChannel(SemaphoreId id, MemoryId dst, MemoryId src); /// Start the proxy service. void startProxy(); @@ -71,8 +79,8 @@ class ProxyService : public BaseProxyService { }; /// Proxy channel. -struct ProxyChannel { - private: +struct BaseProxyChannel { + protected: SemaphoreId semaphoreId_; std::shared_ptr semaphore_; @@ -80,57 +88,56 @@ struct ProxyChannel { std::shared_ptr proxy_; public: - ProxyChannel() = default; + BaseProxyChannel() = default; - ProxyChannel(SemaphoreId semaphoreId, std::shared_ptr semaphore, std::shared_ptr proxy); + BaseProxyChannel(SemaphoreId semaphoreId, std::shared_ptr semaphore, + std::shared_ptr proxy); - ProxyChannel(const ProxyChannel& other) = default; + BaseProxyChannel(const BaseProxyChannel& other) = default; - ProxyChannel& operator=(ProxyChannel& other) = default; + BaseProxyChannel& operator=(BaseProxyChannel& other) = default; - /// Device-side handle for @ref ProxyChannel. - using DeviceHandle = ProxyChannelDeviceHandle; + /// Device-side handle for @ref BaseProxyChannel. + using DeviceHandle = BaseProxyChannelDeviceHandle; /// Returns the device-side handle. /// - /// User should make sure the ProxyChannel is not released when using the returned handle. + /// User should make sure the BaseProxyChannel is not released when using the returned handle. /// DeviceHandle deviceHandle() const; }; -/// Simple proxy channel with a single destination and source memory region. -struct SimpleProxyChannel { +/// A common form of proxy channel with a single destination and source memory region. +struct ProxyChannel : public BaseProxyChannel { private: - ProxyChannel proxyChan_; MemoryId dst_; MemoryId src_; public: /// Default constructor. - SimpleProxyChannel() = default; + ProxyChannel() = default; /// Constructor. - /// @param proxyChan The proxy channel. + /// @param semaphoreId The ID of the semaphore. + /// @param semaphore The semaphore. + /// @param proxy The proxy. /// @param dst The destination memory region. /// @param src The source memory region. - SimpleProxyChannel(ProxyChannel proxyChan, MemoryId dst, MemoryId src); - - /// Constructor. - /// @param proxyChan The proxy channel. - SimpleProxyChannel(ProxyChannel proxyChan) : proxyChan_(proxyChan) {} + ProxyChannel(SemaphoreId semaphoreId, std::shared_ptr semaphore, std::shared_ptr proxy, + MemoryId dst, MemoryId src); /// Copy constructor. - SimpleProxyChannel(const SimpleProxyChannel& other) = default; + ProxyChannel(const ProxyChannel& other) = default; /// Assignment operator. - SimpleProxyChannel& operator=(SimpleProxyChannel& other) = default; + ProxyChannel& operator=(ProxyChannel& other) = default; - /// Device-side handle for @ref SimpleProxyChannel. - using DeviceHandle = SimpleProxyChannelDeviceHandle; + /// Device-side handle for @ref ProxyChannel. + using DeviceHandle = ProxyChannelDeviceHandle; /// Returns the device-side handle. /// - /// User should make sure the SimpleProxyChannel is not released when using the returned handle. + /// User should make sure the ProxyChannel is not released when using the returned handle. /// DeviceHandle deviceHandle() const; }; diff --git a/include/mscclpp/proxy_channel_device.hpp b/include/mscclpp/proxy_channel_device.hpp index 2b91dcec4..b93b215eb 100644 --- a/include/mscclpp/proxy_channel_device.hpp +++ b/include/mscclpp/proxy_channel_device.hpp @@ -83,7 +83,7 @@ union ChannelTrigger { #endif // defined(MSCCLPP_DEVICE_COMPILE) }; -struct ProxyChannelDeviceHandle { +struct BaseProxyChannelDeviceHandle { SemaphoreId semaphoreId_; Host2DeviceSemaphoreDeviceHandle semaphore_; @@ -92,6 +92,12 @@ struct ProxyChannelDeviceHandle { // can produce for and the sole proxy thread consumes it. FifoDeviceHandle fifo_; + BaseProxyChannelDeviceHandle() {} + + BaseProxyChannelDeviceHandle(SemaphoreId semaphoreId, Host2DeviceSemaphoreDeviceHandle semaphore, + FifoDeviceHandle fifo) + : semaphoreId_(semaphoreId), semaphore_(semaphore), fifo_(fifo) {} + #if defined(MSCCLPP_DEVICE_COMPILE) /// Push a @ref TriggerData to the FIFO. /// @param dst The destination memory region. @@ -175,18 +181,23 @@ struct ProxyChannelDeviceHandle { #endif // defined(MSCCLPP_DEVICE_COMPILE) }; -struct SimpleProxyChannelDeviceHandle { - ProxyChannelDeviceHandle proxyChan_; +struct ProxyChannelDeviceHandle : public BaseProxyChannelDeviceHandle { MemoryId dst_; MemoryId src_; + ProxyChannelDeviceHandle(){}; + + ProxyChannelDeviceHandle(SemaphoreId semaphoreId, Host2DeviceSemaphoreDeviceHandle semaphore, FifoDeviceHandle fifo, + MemoryId dst, MemoryId src) + : BaseProxyChannelDeviceHandle(semaphoreId, semaphore, fifo), dst_(dst), src_(src) {} + #if defined(MSCCLPP_DEVICE_COMPILE) /// Push a @ref TriggerData to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. MSCCLPP_DEVICE_INLINE void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { - proxyChan_.put(dst_, dstOffset, src_, srcOffset, size); + BaseProxyChannelDeviceHandle::put(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData to the FIFO. @@ -194,15 +205,12 @@ struct SimpleProxyChannelDeviceHandle { /// @param size The size of the transfer. MSCCLPP_DEVICE_INLINE void put(uint64_t offset, uint64_t size) { put(offset, offset, size); } - /// Push a @ref TriggerFlag to the FIFO. - MSCCLPP_DEVICE_INLINE void signal() { proxyChan_.signal(); } - /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. MSCCLPP_DEVICE_INLINE void putWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { - proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); + BaseProxyChannelDeviceHandle::putWithSignal(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. @@ -215,7 +223,7 @@ struct SimpleProxyChannelDeviceHandle { /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { - proxyChan_.putWithSignalAndFlush(dst_, dstOffset, src_, srcOffset, size); + BaseProxyChannelDeviceHandle::putWithSignalAndFlush(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. @@ -224,17 +232,6 @@ struct SimpleProxyChannelDeviceHandle { MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t offset, uint64_t size) { putWithSignalAndFlush(offset, offset, size); } - - /// Push a @ref TriggerSync to the FIFO. - MSCCLPP_DEVICE_INLINE void flush() { proxyChan_.flush(); } - - /// Check if the proxy channel has been signaled. - /// @return true if the proxy channel has been signaled. - MSCCLPP_DEVICE_INLINE bool poll() { return proxyChan_.poll(); } - - /// Wait for the proxy channel to be signaled. - /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. - MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 10000000) { proxyChan_.wait(maxSpinCount); } #endif // defined(MSCCLPP_DEVICE_COMPILE) }; diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index 1b79fc130..1c2567f42 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -14,7 +14,7 @@ numa, ProxyService, RegisteredMemory, - SimpleProxyChannel, + ProxyChannel, SmChannel, SmDevice2DeviceSemaphore, TcpBootstrap, diff --git a/python/mscclpp/comm.py b/python/mscclpp/comm.py index f370b9441..c2726826f 100644 --- a/python/mscclpp/comm.py +++ b/python/mscclpp/comm.py @@ -14,7 +14,7 @@ Host2HostSemaphore, ProxyService, RegisteredMemory, - SimpleProxyChannel, + ProxyChannel, SmChannel, SmDevice2DeviceSemaphore, TcpBootstrap, @@ -180,8 +180,8 @@ def make_proxy_channels( semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank]) channels = {} for rank in semaphores: - channels[rank] = SimpleProxyChannel( - proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank] + channels[rank] = proxy_service.proxy_channel( + semaphore_ids[rank], memory_ids[rank], memory_ids[self.my_rank] ) return channels @@ -218,8 +218,8 @@ def make_proxy_channels_with_scratch( semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank]) channels = {} for rank in semaphores: - channels[rank] = SimpleProxyChannel( - proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank] + channels[rank] = proxy_service.proxy_channel( + semaphore_ids[rank], memory_ids[rank], memory_ids[self.my_rank] ) return channels @@ -232,7 +232,7 @@ def register_semaphore_with_proxy( semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank]) channels = {} for rank in semaphores: - channels[rank] = proxy_service.proxy_channel(semaphore_ids[rank]) + channels[rank] = proxy_service.base_proxy_channel(semaphore_ids[rank]) return channels def register_memory_with_proxy( diff --git a/python/mscclpp/proxy_channel_py.cpp b/python/mscclpp/proxy_channel_py.cpp index f068c407d..dfe882228 100644 --- a/python/mscclpp/proxy_channel_py.cpp +++ b/python/mscclpp/proxy_channel_py.cpp @@ -23,11 +23,26 @@ void register_proxy_channel(nb::module_& m) { .def("add_semaphore", &ProxyService::addSemaphore, nb::arg("semaphore")) .def("add_memory", &ProxyService::addMemory, nb::arg("memory")) .def("semaphore", &ProxyService::semaphore, nb::arg("id")) - .def("proxy_channel", &ProxyService::proxyChannel, nb::arg("id")); + .def("base_proxy_channel", &ProxyService::baseProxyChannel, nb::arg("id")) + .def("proxy_channel", &ProxyService::proxyChannel, nb::arg("id"), nb::arg("dst"), nb::arg("src")); - nb::class_(m, "ProxyChannel") + nb::class_(m, "BaseProxyChannel") .def(nb::init, std::shared_ptr>(), nb::arg("semaphoreId"), nb::arg("semaphore"), nb::arg("proxy")) + .def("device_handle", &BaseProxyChannel::deviceHandle); + + nb::class_(m, "BaseProxyChannelDeviceHandle") + .def(nb::init<>()) + .def_rw("semaphoreId_", &BaseProxyChannel::DeviceHandle::semaphoreId_) + .def_rw("semaphore_", &BaseProxyChannel::DeviceHandle::semaphore_) + .def_rw("fifo_", &BaseProxyChannel::DeviceHandle::fifo_) + .def_prop_ro("raw", [](const BaseProxyChannel::DeviceHandle& self) -> nb::bytes { + return nb::bytes(reinterpret_cast(&self), sizeof(self)); + }); + + nb::class_(m, "ProxyChannel") + .def(nb::init, std::shared_ptr, MemoryId, MemoryId>(), + nb::arg("semaphoreId"), nb::arg("semaphore"), nb::arg("proxy"), nb::arg("dst"), nb::arg("src")) .def("device_handle", &ProxyChannel::deviceHandle); nb::class_(m, "ProxyChannelDeviceHandle") @@ -35,21 +50,9 @@ void register_proxy_channel(nb::module_& m) { .def_rw("semaphoreId_", &ProxyChannel::DeviceHandle::semaphoreId_) .def_rw("semaphore_", &ProxyChannel::DeviceHandle::semaphore_) .def_rw("fifo_", &ProxyChannel::DeviceHandle::fifo_) + .def_rw("src_", &ProxyChannel::DeviceHandle::src_) + .def_rw("dst_", &ProxyChannel::DeviceHandle::dst_) .def_prop_ro("raw", [](const ProxyChannel::DeviceHandle& self) -> nb::bytes { return nb::bytes(reinterpret_cast(&self), sizeof(self)); }); - - nb::class_(m, "SimpleProxyChannel") - .def(nb::init(), nb::arg("proxyChan"), nb::arg("dst"), nb::arg("src")) - .def(nb::init(), nb::arg("proxyChan")) - .def("device_handle", &SimpleProxyChannel::deviceHandle); - - nb::class_(m, "SimpleProxyChannelDeviceHandle") - .def(nb::init<>()) - .def_rw("proxyChan_", &SimpleProxyChannel::DeviceHandle::proxyChan_) - .def_rw("src_", &SimpleProxyChannel::DeviceHandle::src_) - .def_rw("dst_", &SimpleProxyChannel::DeviceHandle::dst_) - .def_prop_ro("raw", [](const SimpleProxyChannel::DeviceHandle& self) -> nb::bytes { - return nb::bytes(reinterpret_cast(&self), sizeof(self)); - }); }; diff --git a/python/mscclpp_benchmark/allreduce.cu b/python/mscclpp_benchmark/allreduce.cu index 0adf8d548..4c9851b9a 100644 --- a/python/mscclpp_benchmark/allreduce.cu +++ b/python/mscclpp_benchmark/allreduce.cu @@ -301,9 +301,8 @@ extern "C" __global__ void __launch_bounds__(1024, 1) // ------------------------------------------- extern "C" __global__ void __launch_bounds__(1024, 1) - allreduce3(mscclpp::SimpleProxyChannelDeviceHandle* fstRoundChans, - mscclpp::SimpleProxyChannelDeviceHandle* sndRoundChans, TYPE* buff, TYPE* scratch, int rank, - int worldSize, size_t nelems) { + allreduce3(mscclpp::ProxyChannelDeviceHandle* fstRoundChans, mscclpp::ProxyChannelDeviceHandle* sndRoundChans, + TYPE* buff, TYPE* scratch, int rank, int worldSize, size_t nelems) { nelems = nelems / (sizeof(int) / sizeof(TYPE)); int isComm = (threadIdx.x == 0) && (blockIdx.x == 0); @@ -312,10 +311,10 @@ extern "C" __global__ void __launch_bounds__(1024, 1) int peerSendId = (remoteSendRank < rank) ? remoteSendRank : remoteSendRank - 1; int peerRecvId = (remoteRecvRank < rank) ? remoteRecvRank : remoteRecvRank - 1; - mscclpp::SimpleProxyChannelDeviceHandle& devFstSendChan = fstRoundChans[peerSendId]; - mscclpp::SimpleProxyChannelDeviceHandle& devFstRecvChan = fstRoundChans[peerRecvId]; - mscclpp::SimpleProxyChannelDeviceHandle& devSndSendChan = sndRoundChans[peerSendId]; - mscclpp::SimpleProxyChannelDeviceHandle& devSndRecvChan = sndRoundChans[peerRecvId]; + mscclpp::ProxyChannelDeviceHandle& devFstSendChan = fstRoundChans[peerSendId]; + mscclpp::ProxyChannelDeviceHandle& devFstRecvChan = fstRoundChans[peerRecvId]; + mscclpp::ProxyChannelDeviceHandle& devSndSendChan = sndRoundChans[peerSendId]; + mscclpp::ProxyChannelDeviceHandle& devSndRecvChan = sndRoundChans[peerRecvId]; // Step 1 size_t chunkIndex = (rank + worldSize - 1) % worldSize; @@ -529,9 +528,8 @@ __device__ void localAllGatherAllPairsSm(mscclpp::SmChannelDeviceHandle* smChans } // This is an allgather4 equivalent -__device__ void allGatherSm(mscclpp::SmChannelDeviceHandle* smChans, - mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, int rank, int worldSize, - int nRanksPerNode, size_t nelemsPerGPU, int pipelineDepth) { +__device__ void allGatherSm(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::ProxyChannelDeviceHandle* proxyChans, + int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU, int pipelineDepth) { // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: // Step 1: each node does a local allgather and concurrently, @@ -546,7 +544,7 @@ __device__ void allGatherSm(mscclpp::SmChannelDeviceHandle* smChans, int peerRank = (rank + nRanksPerNode) % worldSize; int peerNodeId = peerRank / nRanksPerNode; int peer = (peerRank < rank) ? peerRank : peerRank - 1; - mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[peer]; + mscclpp::ProxyChannelDeviceHandle proxyChan = proxyChans[peer]; const size_t nBlocksForLocalAllGather = gridDim.x / (nRanksPerNode - 1) * (nRanksPerNode - 1); const size_t rankChunkSize = nelemsPerGPU * sizeof(int); const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; @@ -590,9 +588,8 @@ __device__ void allGatherSm(mscclpp::SmChannelDeviceHandle* smChans, nBlocksForLocalAllGather); } -__device__ void reduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans, - mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, TYPE* buff, TYPE* scratch, - int rank, int nRanksPerNode, int worldSize, +__device__ void reduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::ProxyChannelDeviceHandle* proxyChans, + TYPE* buff, TYPE* scratch, int rank, int nRanksPerNode, int worldSize, size_t nelems, // must be divisible by 3 int pipelineDepth) { // this reduce-scatter algorithm works as follows: @@ -615,7 +612,7 @@ __device__ void reduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans, int isComm = (threadIdx.x == 0) && (blockIdx.x == nBlocksForReduceScatter); int peer = (peerRank < rank) ? peerRank : peerRank - 1; int nBlocksRemain = gridDim.x - nBlocksForReduceScatter; - mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[peer]; + mscclpp::ProxyChannelDeviceHandle proxyChan = proxyChans[peer]; if (peerNodeId == rank / nRanksPerNode) { localReduceScatterSm(smChans, buff, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, gridDim.x); return; @@ -675,9 +672,8 @@ __device__ void reduceScatterSm(mscclpp::SmChannelDeviceHandle* smChans, } extern "C" __global__ void __launch_bounds__(1024, 1) __global__ - allreduce4(mscclpp::SmChannelDeviceHandle* smChans, - mscclpp::SimpleProxyChannelDeviceHandle* reduceScatterProxyChans, - mscclpp::SimpleProxyChannelDeviceHandle* allGatherProxyChans, TYPE* buff, TYPE* scratch, int rank, + allreduce4(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::ProxyChannelDeviceHandle* reduceScatterProxyChans, + mscclpp::ProxyChannelDeviceHandle* allGatherProxyChans, TYPE* buff, TYPE* scratch, int rank, int nRanksPerNode, int worldSize, size_t nelems, int pipelineDepth) { nelems = nelems / (sizeof(int) / sizeof(TYPE)); reduceScatterSm(smChans, reduceScatterProxyChans, buff, scratch, rank, nRanksPerNode, worldSize, nelems, @@ -688,7 +684,7 @@ extern "C" __global__ void __launch_bounds__(1024, 1) __global__ // allreduce 5 for 2-nodes extern "C" __global__ void __launch_bounds__(1024, 1) - allreduce5(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::SimpleProxyChannelDeviceHandle* proxyChans, TYPE* buff, + allreduce5(mscclpp::SmChannelDeviceHandle* smChans, mscclpp::ProxyChannelDeviceHandle* proxyChans, TYPE* buff, TYPE* scratch, TYPE* putBuff, TYPE* resultBuff, int rank, int nRanksPerNode, int worldSize, size_t nelems) { nelems = nelems / (sizeof(int) / sizeof(TYPE)); @@ -706,7 +702,7 @@ extern "C" __global__ void __launch_bounds__(1024, 1) const int peerIdx = blockIdx.x / nBlocksPerPeer; const int remoteRankIdx = peerIdx < localRankId ? peerIdx : peerIdx + 1; mscclpp::SmChannelDeviceHandle smChan = smChans[peerIdx]; - mscclpp::SimpleProxyChannelDeviceHandle proxyChan = proxyChans[localRankId]; + mscclpp::ProxyChannelDeviceHandle proxyChan = proxyChans[localRankId]; const int tid = threadIdx.x + localBlockIdx * blockDim.x; // double buffering size_t scratchBaseOffset = (flag & 1) ? 0 : nPkts * sizeof(mscclpp::LLPacket); diff --git a/python/test/simple_proxy_channel_test.cu b/python/test/proxy_channel_test.cu similarity index 87% rename from python/test/simple_proxy_channel_test.cu rename to python/test/proxy_channel_test.cu index 0a7542768..d79a97bf6 100644 --- a/python/test/simple_proxy_channel_test.cu +++ b/python/test/proxy_channel_test.cu @@ -6,8 +6,8 @@ // be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing extern "C" __global__ void __launch_bounds__(1024, 1) - simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks, int* data, - int* scratch, int num_elements, int use_packet) { + proxy_channel(mscclpp::ProxyChannelDeviceHandle* channels, int my_rank, int nranks, int* data, int* scratch, + int num_elements, int use_packet) { int tid = threadIdx.x; int nthreads = blockDim.x; uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks; diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 3e7fa90da..929d975c6 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -346,9 +346,9 @@ def __init__( ).get_compiled_kernel() self.nblocks = 1 self.nthreads = nranks - elif test_name == "simple_proxy_channel": + elif test_name == "proxy_channel": self._kernel = KernelBuilder( - file="simple_proxy_channel_test.cu", kernel_name="simple_proxy_channel", file_dir=file_dir + file="proxy_channel_test.cu", kernel_name="proxy_channel", file_dir=file_dir ).get_compiled_kernel() self.nblocks = 1 self.nthreads = 1024 @@ -376,11 +376,11 @@ def __init__( # keep a reference to the device handles so that they don't get garbage collected self._d_semaphore_or_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8) - if test_name in ["h2d_semaphore", "d2d_semaphore", "sm_channel", "simple_proxy_channel"]: + if test_name in ["h2d_semaphore", "d2d_semaphore", "sm_channel", "proxy_channel"]: self.params += pack(self._d_semaphore_or_channels, my_rank, nranks) if test_name == "sm_channel": self.params += pack(tensor.size, use_packet) - if test_name == "simple_proxy_channel": + if test_name == "proxy_channel": self.params += pack(tensor, scratch, tensor.size, use_packet) elif test_name == "fifo": self.params = fifo.device_handle().raw @@ -531,7 +531,7 @@ def test_proxy(mpi_group: MpiGroup, nelem: int, transport: str): @pytest.mark.parametrize("nelem", [2**i for i in [10, 15, 20]]) @pytest.mark.parametrize("transport", ["NVLink", "IB"]) @pytest.mark.parametrize("use_packet", [False, True]) -def test_simple_proxy_channel(mpi_group: MpiGroup, nelem: int, transport: str, use_packet: bool): +def test_proxy_channel(mpi_group: MpiGroup, nelem: int, transport: str, use_packet: bool): group, connections = create_group_and_connection(mpi_group, transport) memory = cp.zeros(nelem, dtype=cp.int32) @@ -552,13 +552,13 @@ def test_simple_proxy_channel(mpi_group: MpiGroup, nelem: int, transport: str, u memory_to_register = scratch else: memory_to_register = memory - simple_channels = group.make_proxy_channels(proxy_service, memory_to_register, connections) + channels = group.make_proxy_channels(proxy_service, memory_to_register, connections) kernel = MscclppKernel( - "simple_proxy_channel", + "proxy_channel", my_rank=group.my_rank, nranks=group.nranks, - semaphore_or_channels=simple_channels, + semaphore_or_channels=channels, tensor=memory, use_packet=use_packet, scratch=scratch, diff --git a/src/executor/executor.cc b/src/executor/executor.cc index ae34fa1bb..ad694b3f7 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -116,7 +116,7 @@ struct ExecutionContext { std::vector> smSemaphores; std::vector proxySemaphores; std::vector smChannels; - std::vector proxyChannels; + std::vector proxyChannels; std::vector nvlsChannels; std::unordered_map> deviceExecutionPlans; std::unordered_map> deviceExecutionPlansBuffers; @@ -339,10 +339,10 @@ struct Executor::Impl { context.smChannels.emplace_back(context.smSemaphores[index++], context.registeredMemories[{info.dstBufferType, peer}], src, nullptr); } else if (channelType == ChannelType::PROXY) { - context.proxyChannels.emplace_back( - context.proxyService->proxyChannel(context.proxySemaphores[index++]), + context.proxyChannels.emplace_back(context.proxyService->proxyChannel( + context.proxySemaphores[index++], context.proxyService->addMemory(context.registeredMemories[{info.dstBufferType, peer}]), - context.proxyService->addMemory(localMemory)); + context.proxyService->addMemory(localMemory))); } } } diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index d0d0dc30d..f6ed215e1 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -54,7 +54,7 @@ enum class OperationType : uint8_t { struct Channels { mscclpp::DeviceHandle smChannels[MAX_CHANNEL]; - mscclpp::DeviceHandle proxyChannels[MAX_CHANNEL]; + mscclpp::DeviceHandle proxyChannels[MAX_CHANNEL]; mscclpp::DeviceHandle nvlsChannels[MAX_CHANNEL]; }; diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 1b0490f91..98bed37eb 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -192,9 +192,8 @@ MSCCLPP_DEVICE_INLINE T* getBuffer(T* input, T* output, T* scratch, BufferType b return nullptr; } -MSCCLPP_DEVICE_INLINE void handleSignal(DeviceHandle* smChannels, - DeviceHandle* proxyChannels, uint8_t* channelIndex, - int nChannels, ChannelType chType) { +MSCCLPP_DEVICE_INLINE void handleSignal(DeviceHandle* smChannels, DeviceHandle* proxyChannels, + uint8_t* channelIndex, int nChannels, ChannelType chType) { int tid = threadIdx.x; if (tid < nChannels && chType == ChannelType::SM) { smChannels[channelIndex[tid]].signal(); @@ -205,9 +204,8 @@ MSCCLPP_DEVICE_INLINE void handleSignal(DeviceHandle* smChannels, } } -MSCCLPP_DEVICE_INLINE void handleWait(DeviceHandle* smChannels, - DeviceHandle* proxyChannels, uint8_t* channelIndexes, - int nChannels, ChannelType chType) { +MSCCLPP_DEVICE_INLINE void handleWait(DeviceHandle* smChannels, DeviceHandle* proxyChannels, + uint8_t* channelIndexes, int nChannels, ChannelType chType) { int tid = threadIdx.x; if (tid < nChannels && chType == ChannelType::SM) { smChannels[channelIndexes[tid]].wait(); @@ -218,7 +216,7 @@ MSCCLPP_DEVICE_INLINE void handleWait(DeviceHandle* smChannels, } } -MSCCLPP_DEVICE_INLINE void handleFlush(DeviceHandle* proxyChannels, uint8_t* channelIndexes, +MSCCLPP_DEVICE_INLINE void handleFlush(DeviceHandle* proxyChannels, uint8_t* channelIndexes, int nChannels) { int tid = threadIdx.x; if (tid < nChannels) { @@ -236,10 +234,9 @@ MSCCLPP_DEVICE_INLINE void handleGet(DeviceHandle* smChannel, uint8_t } template -MSCCLPP_DEVICE_INLINE void handlePut(DeviceHandle* smChannel, - DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, - uint32_t* dstOffsets, uint32_t* srcOffsets, int count, uint32_t size, - ChannelType chType) { +MSCCLPP_DEVICE_INLINE void handlePut(DeviceHandle* smChannel, DeviceHandle* proxyChannels, + uint8_t* dstChannelIndexes, uint32_t* dstOffsets, uint32_t* srcOffsets, int count, + uint32_t size, ChannelType chType) { if (chType == ChannelType::SM) { for (int i = 0; i < count; i++) { uint32_t dstOffset = dstOffsets[i]; @@ -311,7 +308,7 @@ MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* output, uint32_t outputOf template MSCCLPP_DEVICE_INLINE void handlePutPacket(size_t scratchSize, DeviceHandle* smChannels, - DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, + DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, uint32_t* dstOffsets, uint32_t* srcOffsets, int nDstChannels, uint32_t size, ChannelType chType, uint32_t flag) { const size_t scratchBaseOffset = flag & 0x1 ? 0 : scratchSize >> 1; @@ -496,7 +493,7 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu int nOperations = localPlan->nOperations; Operation* operations = localPlan->operations; DeviceHandle* smChannels = localPlan->channels.smChannels; - DeviceHandle* proxyChannels = localPlan->channels.proxyChannels; + DeviceHandle* proxyChannels = localPlan->channels.proxyChannels; [[maybe_unused]] DeviceHandle* nvlsChannels = localPlan->channels.nvlsChannels; diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index 3e63e54eb..f231e73a1 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -9,12 +9,14 @@ namespace mscclpp { -MSCCLPP_API_CPP ProxyChannel::ProxyChannel(SemaphoreId semaphoreId, std::shared_ptr semaphore, - std::shared_ptr proxy) +MSCCLPP_API_CPP BaseProxyChannel::BaseProxyChannel(SemaphoreId semaphoreId, + std::shared_ptr semaphore, + std::shared_ptr proxy) : semaphoreId_(semaphoreId), semaphore_(semaphore), proxy_(proxy) {} -MSCCLPP_API_CPP SimpleProxyChannel::SimpleProxyChannel(ProxyChannel proxyChan, MemoryId dst, MemoryId src) - : proxyChan_(proxyChan), dst_(dst), src_(src) {} +MSCCLPP_API_CPP ProxyChannel::ProxyChannel(SemaphoreId semaphoreId, std::shared_ptr semaphore, + std::shared_ptr proxy, MemoryId dst, MemoryId src) + : BaseProxyChannel(semaphoreId, semaphore, proxy), dst_(dst), src_(src) {} MSCCLPP_API_CPP ProxyService::ProxyService(size_t fifoSize) : proxy_(std::make_shared([&](ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, @@ -44,8 +46,12 @@ MSCCLPP_API_CPP std::shared_ptr ProxyService::semaphore(Se return semaphores_[id]; } -MSCCLPP_API_CPP ProxyChannel ProxyService::proxyChannel(SemaphoreId id) { - return ProxyChannel(id, semaphores_[id], proxy_); +MSCCLPP_API_CPP BaseProxyChannel ProxyService::baseProxyChannel(SemaphoreId id) { + return BaseProxyChannel(id, semaphores_[id], proxy_); +} + +MSCCLPP_API_CPP ProxyChannel ProxyService::proxyChannel(SemaphoreId id, MemoryId dst, MemoryId src) { + return ProxyChannel(id, semaphores_[id], proxy_, dst, src); } MSCCLPP_API_CPP void ProxyService::startProxy() { proxy_->start(); } @@ -84,13 +90,13 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { return result; } -MSCCLPP_API_CPP ProxyChannel::DeviceHandle ProxyChannel::deviceHandle() const { - return ProxyChannel::DeviceHandle{ - .semaphoreId_ = semaphoreId_, .semaphore_ = semaphore_->deviceHandle(), .fifo_ = proxy_->fifo().deviceHandle()}; +MSCCLPP_API_CPP BaseProxyChannel::DeviceHandle BaseProxyChannel::deviceHandle() const { + return BaseProxyChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo().deviceHandle()); } -MSCCLPP_API_CPP SimpleProxyChannel::DeviceHandle SimpleProxyChannel::deviceHandle() const { - return SimpleProxyChannel::DeviceHandle{.proxyChan_ = proxyChan_.deviceHandle(), .dst_ = dst_, .src_ = src_}; +MSCCLPP_API_CPP ProxyChannel::DeviceHandle ProxyChannel::deviceHandle() const { + return ProxyChannel::DeviceHandle(semaphoreId_, semaphore_->deviceHandle(), proxy_->fifo().deviceHandle(), dst_, + src_); } } // namespace mscclpp diff --git a/src/semaphore.cc b/src/semaphore.cc index 7dec60c3d..348f1cdb1 100644 --- a/src/semaphore.cc +++ b/src/semaphore.cc @@ -94,7 +94,7 @@ MSCCLPP_API_CPP SmDevice2DeviceSemaphore::SmDevice2DeviceSemaphore(Communicator& setupInboundSemaphoreId(communicator, connection.get(), localInboundSemaphore_.get()); isRemoteInboundSemaphoreIdSet_ = true; } else if (AllIBTransports.has(connection->transport())) { - // We don't need to really with any of the IB transports, since the values will be local + // Should we throw an error here? isRemoteInboundSemaphoreIdSet_ = false; } } diff --git a/test/allgather_test_cpp.cu b/test/allgather_test_cpp.cu index 2f56b221d..0f5d37759 100644 --- a/test/allgather_test_cpp.cu +++ b/test/allgather_test_cpp.cu @@ -40,9 +40,9 @@ static double getTime(void) { template using DeviceHandle = mscclpp::DeviceHandle; -__constant__ DeviceHandle constProxyChans[16]; +__constant__ DeviceHandle constProxyChans[16]; -__device__ void allgather0(DeviceHandle proxyChan, int rank, size_t nelemsPerGPU) { +__device__ void allgather0(DeviceHandle proxyChan, int rank, size_t nelemsPerGPU) { // this allgather is really simple and implemented as an alltoall // this thread's role is a sender role @@ -57,7 +57,7 @@ __device__ void allgather0(DeviceHandle proxyChan, if ((threadIdx.x % 32) == 0) proxyChan.wait(); } -__device__ void localAllGather(DeviceHandle proxyChan, int rank, int nranksPerNode, +__device__ void localAllGather(DeviceHandle proxyChan, int rank, int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) { // this allgather algorithm works as follows: // Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode @@ -83,16 +83,16 @@ __device__ void localAllGather(DeviceHandle proxyCh } } -__device__ void allgather1(DeviceHandle proxyChan, int rank, int nranksPerNode, - int remoteRank, size_t nelemsPerGPU) { +__device__ void allgather1(DeviceHandle proxyChan, int rank, int nranksPerNode, int remoteRank, + size_t nelemsPerGPU) { localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); if (remoteRank / nranksPerNode == rank / nranksPerNode) if ((threadIdx.x % 32) == 0) proxyChan.flush(); } -__device__ void allgather2(DeviceHandle proxyChan, int rank, int world_size, - int nranksPerNode, int remoteRank, size_t nelemsPerGPU) { +__device__ void allgather2(DeviceHandle proxyChan, int rank, int world_size, int nranksPerNode, + int remoteRank, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: // Step 1: each node does a local allgather and concurrently, @@ -159,7 +159,7 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem int warpId = threadIdx.x / 32; int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks - DeviceHandle proxyChan = constProxyChans[warpId]; + DeviceHandle proxyChan = constProxyChans[warpId]; if (kernel == 0) allgather0(proxyChan, rank, nelemsPerGPU); @@ -234,18 +234,17 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co comm.setup(); - std::vector> proxyChannels; + std::vector> proxyChannels; for (size_t i = 0; i < semaphoreIds.size(); ++i) { - proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( - proxyService.proxyChannel(semaphoreIds[i]), proxyService.addMemory(remoteMemories[i].get()), - proxyService.addMemory(localMemories[i])))); + proxyChannels.push_back(mscclpp::deviceHandle(proxyService.proxyChannel( + semaphoreIds[i], proxyService.addMemory(remoteMemories[i].get()), proxyService.addMemory(localMemories[i])))); } - if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDACHECK(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + sizeof(DeviceHandle) * proxyChannels.size())); } void printUsage(const char* prog, bool isMpi) { diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index 8afa8e917..e1c9068ef 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -140,7 +140,7 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase { void SetUp() override; void TearDown() override; - void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, + void setupMeshConnections(std::vector& proxyChannels, bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); void testPingPong(PingPongTestParams params); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 75858b631..79ca9b656 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -15,9 +15,9 @@ void ProxyChannelOneToOneTest::SetUp() { void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } -void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, - bool useIPC, bool useIb, bool useEthernet, void* sendBuff, - size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { +void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, bool useIPC, + bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes, + void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); @@ -64,17 +64,17 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vectorbuildAndAddSemaphore(*communicator, connectionFutures[r].get()); - proxyChannels.emplace_back(proxyService->proxyChannel(cid), proxyService->addMemory(remoteMemFutures[r].get()), - proxyService->addMemory(sendBufRegMem)); + proxyChannels.emplace_back(proxyService->proxyChannel(cid, proxyService->addMemory(remoteMemFutures[r].get()), + proxyService->addMemory(sendBufRegMem))); } communicator->setup(); } -__constant__ DeviceHandle gChannelOneToOneTestConstProxyChans; +__constant__ DeviceHandle gChannelOneToOneTestConstProxyChans; __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, bool waitWithPoll, int nTries, int* ret) { - DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; + DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; volatile int* sendBuff = (volatile int*)buff; int flusher = 0; int rank1Offset = 10000000; @@ -156,16 +156,16 @@ void ProxyChannelOneToOneTest::testPingPong(PingPongTestParams params) { const int nElem = 4 * 1024 * 1024; - std::vector proxyChannels; + std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); setupMeshConnections(proxyChannels, params.useIPC, params.useIB, params.useEthernet, buff.get(), nElem * sizeof(int)); - std::vector> proxyChannelHandles; + std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); ASSERT_EQ(proxyChannels.size(), 1); MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannelHandles.data(), - sizeof(DeviceHandle))); + sizeof(DeviceHandle))); proxyService->startProxy(); @@ -201,16 +201,16 @@ void ProxyChannelOneToOneTest::testPingPongPerf(PingPongTestParams params) { const int nElem = 4 * 1024 * 1024; - std::vector proxyChannels; + std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); setupMeshConnections(proxyChannels, params.useIPC, params.useIB, params.useEthernet, buff.get(), nElem * sizeof(int)); - std::vector> proxyChannelHandles; + std::vector> proxyChannelHandles; for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle()); ASSERT_EQ(proxyChannels.size(), 1); MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannelHandles.data(), - sizeof(DeviceHandle))); + sizeof(DeviceHandle))); proxyService->startProxy(); @@ -279,7 +279,7 @@ __global__ void kernelProxyLLPingPong(int* buff, mscclpp::LLPacket* putPktBuf, m int nElem, int nTries, int* ret) { if (rank > 1) return; - DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; + DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; volatile int* buffPtr = (volatile int*)buff; int putOffset = (rank == 0) ? 0 : 10000000; int getOffset = (rank == 0) ? 10000000 : 0; @@ -343,7 +343,7 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) { const int nElem = 4 * 1024 * 1024; - std::vector proxyChannels; + std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); const size_t nPacket = (nElem * sizeof(int) + sizeof(uint64_t) - 1) / sizeof(uint64_t); @@ -355,13 +355,13 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) { ASSERT_EQ(proxyChannels.size(), 1); - std::vector> proxyChannelHandles; + std::vector> proxyChannelHandles; for (auto& proxyChannel : proxyChannels) { proxyChannelHandles.push_back(proxyChannel.deviceHandle()); } MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannelHandles.data(), - sizeof(DeviceHandle))); + sizeof(DeviceHandle))); mscclpp::DeviceSyncer syncer = {}; MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestProxyChansSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); @@ -410,7 +410,7 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { const int nElem = 4 * 1024 * 1024; - std::vector proxyChannels; + std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); const size_t nPacket = (nElem * sizeof(int) + sizeof(uint64_t) - 1) / sizeof(uint64_t); @@ -422,13 +422,13 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { ASSERT_EQ(proxyChannels.size(), 1); - std::vector> proxyChannelHandles; + std::vector> proxyChannelHandles; for (auto& proxyChannel : proxyChannels) { proxyChannelHandles.push_back(proxyChannel.deviceHandle()); } MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannelHandles.data(), - sizeof(DeviceHandle))); + sizeof(DeviceHandle))); mscclpp::DeviceSyncer syncer = {}; MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestProxyChansSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index 714b2858d..3fd65e3d2 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -20,8 +20,8 @@ constexpr uint64_t MAGIC = 0xdeadbeef; template using DeviceHandle = mscclpp::DeviceHandle; -__constant__ DeviceHandle constProxyChans[16]; -__constant__ DeviceHandle constRawProxyChan[16]; +__constant__ DeviceHandle constProxyChans[16]; +__constant__ DeviceHandle constRawProxyChan[16]; __constant__ DeviceHandle constSmChans[512]; __constant__ DeviceHandle constSmOutOfPlaceChans[16]; @@ -31,7 +31,7 @@ __global__ void __launch_bounds__(1024) allgather0(int rank, size_t nelemsPerGPU int warpId = threadIdx.x / WARP_SIZE; // Each warp is responsible for one of the remote ranks - DeviceHandle proxyChan = constProxyChans[warpId]; + DeviceHandle proxyChan = constProxyChans[warpId]; // this allgather is really simple and implemented as an alltoall @@ -49,7 +49,7 @@ __global__ void __launch_bounds__(1024) allgather0(int rank, size_t nelemsPerGPU if (threadIdx.x % WARP_SIZE == 0) proxyChan.wait(); } -__device__ void localAllGather(DeviceHandle proxyChan, int rank, int nRanksPerNode, +__device__ void localAllGather(DeviceHandle proxyChan, int rank, int nRanksPerNode, int remoteRank, uint64_t offset, uint64_t size, bool flushAfterSignal = true) { // this allgather algorithm works as follows: // Step 1: GPU rank i sends data to GPU rank (i+1) % nRanksPerNode @@ -129,7 +129,7 @@ __global__ void __launch_bounds__(1024) allgather1(int rank, int nRanksPerNode, int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks - DeviceHandle proxyChan = constProxyChans[warpId]; + DeviceHandle proxyChan = constProxyChans[warpId]; localAllGather(proxyChan, rank, nRanksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); @@ -140,7 +140,7 @@ __global__ void __launch_bounds__(1024) allgather2(int rank, int worldSize, int int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks - DeviceHandle proxyChan = constProxyChans[warpId]; + DeviceHandle proxyChan = constProxyChans[warpId]; // this allgather is a pipelined and hierarchical one and only works for two nodes // it is implemented as follows: @@ -214,7 +214,7 @@ __global__ void __launch_bounds__(1024) allgather3() { int warpId = threadIdx.x / WARP_SIZE; // Each warp is responsible for one of the remote ranks - DeviceHandle proxyChan = constRawProxyChan[warpId]; + DeviceHandle proxyChan = constRawProxyChan[warpId]; int tid = threadIdx.x; __syncthreads(); @@ -247,7 +247,7 @@ __global__ void __launch_bounds__(1024) allgather4(int rank, int worldSize, int int peerRank = (rank + nRanksPerNode) % worldSize; int peerNodeId = peerRank / nRanksPerNode; int peer = (peerRank < rank) ? peerRank : peerRank - 1; - DeviceHandle& proxyChan = constProxyChans[peer]; + DeviceHandle& proxyChan = constProxyChans[peer]; const size_t nBlocksForLocalAllGather = gridDim.x; const size_t rankChunkSize = nelemsPerGPU * sizeof(int); const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; @@ -512,10 +512,10 @@ class AllGatherProxyService : public mscclpp::BaseProxyService { semaphores_.push_back(std::make_shared(communicator, connection)); return semaphores_.size() - 1; } - std::vector> proxyChannels() { - std::vector> result; + std::vector> proxyChannels() { + std::vector> result; for (auto& semaphore : semaphores_) { - result.push_back(mscclpp::deviceHandle(mscclpp::ProxyChannel(0, semaphore, proxy_))); + result.push_back(mscclpp::deviceHandle(mscclpp::BaseProxyChannel(0, semaphore, proxy_))); } return result; } @@ -722,14 +722,14 @@ void AllGatherTestEngine::allocateBuffer() { } void AllGatherTestEngine::setupConnections() { - std::vector> devProxyChannels; + std::vector> devProxyChannels; if (!isUsingHostOffload(args_.kernelNum)) { setupMeshConnections(devProxyChannels, sendBuff_.get(), args_.maxBytes); - if (devProxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + if (devProxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constProxyChans, devProxyChannels.data(), - sizeof(DeviceHandle) * devProxyChannels.size())); + sizeof(DeviceHandle) * devProxyChannels.size())); setupMeshConnections(smChannels_, sendBuff_.get(), args_.maxBytes, nullptr, 0, ChannelSemantic::PUT, 64); std::vector> smChannelHandles(smChannels_.size()); @@ -770,11 +770,11 @@ void AllGatherTestEngine::setupConnections() { comm_->setup(); }); auto proxyChannels = service->proxyChannels(); - if (proxyChannels.size() > sizeof(constRawProxyChan) / sizeof(DeviceHandle)) { + if (proxyChannels.size() > sizeof(constRawProxyChan) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constRawProxyChan, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + sizeof(DeviceHandle) * proxyChannels.size())); } } diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 84eb694b1..6ba6ce3db 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -12,8 +12,8 @@ template using DeviceHandle = mscclpp::DeviceHandle; -__constant__ DeviceHandle constDevFstRoundChans[16]; -__constant__ DeviceHandle constDevSndRoundChans[16]; +__constant__ DeviceHandle constDevFstRoundChans[16]; +__constant__ DeviceHandle constDevSndRoundChans[16]; __constant__ DeviceHandle constSmInPlaceChans[8]; __constant__ DeviceHandle constSmOutOfPlaceChans[8]; @@ -93,8 +93,8 @@ __device__ void localReduceScatter(int* buff, int* scratch, int rank, int nRanks int peerSendId = (remoteSendToRank < rank) ? remoteSendToRank : remoteSendToRank - 1; int peerRecvId = (remoteRecvFromRank < rank) ? remoteRecvFromRank : remoteRecvFromRank - 1; - DeviceHandle& devFstSendChan = constDevFstRoundChans[peerSendId]; - DeviceHandle& devFstRecvChan = constDevFstRoundChans[peerRecvId]; + DeviceHandle& devFstSendChan = constDevFstRoundChans[peerSendId]; + DeviceHandle& devFstRecvChan = constDevFstRoundChans[peerRecvId]; size_t srcOffset = (((rankIndexInNode + i) % nRanksPerNode + startChunkIndex) * chunkSize + offsetInChunk) * sizeof(int); size_t dstOffset = rank * chunkSize * sizeof(int); @@ -109,7 +109,7 @@ __device__ void localReduceScatter(int* buff, int* scratch, int rank, int nRanks int prePeerRecvId = (preRemoteRecvFromRank < rank) ? preRemoteRecvFromRank : preRemoteRecvFromRank - 1; // overlap communication and computation - DeviceHandle& preDevFstRecvChan = constDevFstRoundChans[prePeerRecvId]; + DeviceHandle& preDevFstRecvChan = constDevFstRoundChans[prePeerRecvId]; if (isComm) { preDevFstRecvChan.wait(); devFstSendChan.putWithSignal(dstOffset, srcOffset, nelems * sizeof(int)); @@ -156,7 +156,7 @@ __device__ void reduceScatter(int* buff, int* scratch, int rank, int nRanksPerNo int peerNodeId = peerRank / nRanksPerNode; int isComm = (threadIdx.x == 0) && (blockIdx.x == 0); int peer = (peerRank < rank) ? peerRank : peerRank - 1; - DeviceHandle& proxyChan = constDevFstRoundChans[peer]; + DeviceHandle& proxyChan = constDevFstRoundChans[peer]; if (peerNodeId == rank / nRanksPerNode) { localReduceScatter(buff, scratch, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize); return; @@ -227,8 +227,8 @@ __device__ void localAllGather(int rank, int nRanksPerNode, uint64_t offset, uin int peerSendId = (remoteSendToRank < rank) ? remoteSendToRank : remoteSendToRank - 1; int peerRecvId = (remoteRecvFromRank < rank) ? remoteRecvFromRank : remoteRecvFromRank - 1; - DeviceHandle& devSendChan = constDevSndRoundChans[peerSendId]; - DeviceHandle& devRecvChan = constDevSndRoundChans[peerRecvId]; + DeviceHandle& devSendChan = constDevSndRoundChans[peerSendId]; + DeviceHandle& devRecvChan = constDevSndRoundChans[peerRecvId]; // wait for the data from GPU (rank-i) % nranksPerNode to arrive devSendChan.putWithSignal(offset, size); devRecvChan.wait(); @@ -251,7 +251,7 @@ __device__ void allGather(int rank, int worldSize, int nRanksPerNode, size_t nel int peerRank = (rank + nRanksPerNode) % worldSize; int peerNodeId = peerRank / nRanksPerNode; int peer = (peerRank < rank) ? peerRank : peerRank - 1; - DeviceHandle& proxyChan = constDevSndRoundChans[peer]; + DeviceHandle& proxyChan = constDevSndRoundChans[peer]; if (peerNodeId == rank / nRanksPerNode) { localAllGather(rank, nRanksPerNode, rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); @@ -456,7 +456,7 @@ __device__ void reduceScatterSm(int* buff, int* scratch, int rank, int nRanksPer int isComm = (threadIdx.x == 0) && ((int)blockIdx.x == nBlocksForReduceScatter); int peer = (peerRank < rank) ? peerRank : peerRank - 1; int nBlocksRemain = gridDim.x - nBlocksForReduceScatter; - DeviceHandle& proxyChan = constDevFstRoundChans[peer]; + DeviceHandle& proxyChan = constDevFstRoundChans[peer]; if (peerNodeId == rank / nRanksPerNode) { localReduceScatterSm(buff, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, gridDim.x); return; @@ -631,7 +631,7 @@ __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t n int peerRank = (rank + nRanksPerNode) % worldSize; int peerNodeId = peerRank / nRanksPerNode; int peer = (peerRank < rank) ? peerRank : peerRank - 1; - DeviceHandle& proxyChan = constDevSndRoundChans[peer]; + DeviceHandle& proxyChan = constDevSndRoundChans[peer]; const size_t nBlocksForLocalAllGather = gridDim.x / (nRanksPerNode - 1) * (nRanksPerNode - 1); const size_t rankChunkSize = nelemsPerGPU * sizeof(int); const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; @@ -681,7 +681,7 @@ __global__ void __launch_bounds__(1024) int remoteRank = (peerId < rank) ? peerId : peerId + 1; // 1st communication phase: send data to the scratch buffer of the peer associated with this block - DeviceHandle& devFstRoundChan = constDevFstRoundChans[peerId]; + DeviceHandle& devFstRoundChan = constDevFstRoundChans[peerId]; Chunk toPeerChunk = getChunk(nelems, worldSize, remoteRank); // Now we need to figure out the offset of this chunk in the scratch buffer of the destination. // The destination will have allocated a scratch buffer of size numPeers() * toPeerChunk.size and @@ -699,7 +699,7 @@ __global__ void __launch_bounds__(1024) deviceSyncer.sync(gridDim.x); // Local reduction: every block reduces a slice of each chunk in the scratch buffer into the user buffer - DeviceHandle& devSndRoundChan = constDevSndRoundChans[peerId]; + DeviceHandle& devSndRoundChan = constDevSndRoundChans[peerId]; Chunk rankChunk = getChunk(nelems, worldSize, rank); int* chunk = buff + rankChunk.offset; int numPeers = gridDim.x / BLOCKS_PER_PEER; @@ -733,10 +733,10 @@ __global__ void __launch_bounds__(1024) allreduce1(int* buff, int* scratch, int int peerSendId = (remoteSendRank < rank) ? remoteSendRank : remoteSendRank - 1; int peerRecvId = (remoteRecvRank < rank) ? remoteRecvRank : remoteRecvRank - 1; - DeviceHandle& devFstSendChan = constDevFstRoundChans[peerSendId]; - DeviceHandle& devFstRecvChan = constDevFstRoundChans[peerRecvId]; - DeviceHandle& devSndSendChan = constDevSndRoundChans[peerSendId]; - DeviceHandle& devSndRecvChan = constDevSndRoundChans[peerRecvId]; + DeviceHandle& devFstSendChan = constDevFstRoundChans[peerSendId]; + DeviceHandle& devFstRecvChan = constDevFstRoundChans[peerRecvId]; + DeviceHandle& devSndSendChan = constDevSndRoundChans[peerSendId]; + DeviceHandle& devSndRecvChan = constDevSndRoundChans[peerRecvId]; // Step 1 size_t chunkIndex = (rank + worldSize - 1) % worldSize; @@ -850,7 +850,7 @@ __global__ void __launch_bounds__(1024) // Channel to a remote peer that has the same local rank as me int localRank = rank % nRanksPerNode; - DeviceHandle proxyChan = constDevFstRoundChans[localRank]; + DeviceHandle proxyChan = constDevFstRoundChans[localRank]; // Flag for packets. Initially 1 uint32_t flag = (uint32_t)globalFlag; @@ -1309,7 +1309,7 @@ void AllReduceTestEngine::setupConnections() { [](const mscclpp::SmChannel& smChannel) { return mscclpp::deviceHandle(smChannel); }); }; if (isUsePacket()) { - std::vector> proxyChannels; + std::vector> proxyChannels; const size_t nPacket = (args_.maxBytes + sizeof(uint64_t) - 1) / sizeof(uint64_t); if (args_.kernelNum == 6 || args_.kernelNum == 7) { @@ -1332,7 +1332,7 @@ void AllReduceTestEngine::setupConnections() { if (smOutOfPlaceChannels_.size() > sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } - if (proxyChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { + if (proxyChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } @@ -1341,27 +1341,27 @@ void AllReduceTestEngine::setupConnections() { CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceChans, smChannelDeviceHandles.data(), sizeof(DeviceHandle) * smChannelDeviceHandles.size())); CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + sizeof(DeviceHandle) * proxyChannels.size())); } } else { - std::vector> fstRoundChannels; - std::vector> sndRoundChannels; + std::vector> fstRoundChannels; + std::vector> sndRoundChannels; // Send data from local inputBuff to remote scratchBuff (out-of-place) setupMeshConnections(fstRoundChannels, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); - if (fstRoundChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { + if (fstRoundChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, fstRoundChannels.data(), - sizeof(DeviceHandle) * fstRoundChannels.size())); + sizeof(DeviceHandle) * fstRoundChannels.size())); // Send data from local inputBuff to remote inputBuff (in-place) setupMeshConnections(sndRoundChannels, inputBuff_.get(), args_.maxBytes); - if (sndRoundChannels.size() > sizeof(constDevSndRoundChans) / sizeof(DeviceHandle)) { + if (sndRoundChannels.size() > sizeof(constDevSndRoundChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constDevSndRoundChans, sndRoundChannels.data(), - sizeof(DeviceHandle) * sndRoundChannels.size())); + sizeof(DeviceHandle) * sndRoundChannels.size())); setupMeshConnections(smOutOfPlaceChannels_, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); if (smOutOfPlaceChannels_.size() > sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)) { diff --git a/test/mscclpp-test/alltoall_test.cu b/test/mscclpp-test/alltoall_test.cu index a1881af91..d3c8d891a 100644 --- a/test/mscclpp-test/alltoall_test.cu +++ b/test/mscclpp-test/alltoall_test.cu @@ -8,7 +8,7 @@ template using DeviceHandle = mscclpp::DeviceHandle; -__constant__ DeviceHandle constProxyChans[16]; +__constant__ DeviceHandle constProxyChans[16]; __device__ mscclpp::DeviceSyncer deviceSyncer; void* localRecvBuff; void* localSendBuff; @@ -16,7 +16,7 @@ void* localSendBuff; __device__ void localAlltoall(int rank, int nRanksPerNode, size_t nElements) { int remoteRank = ((int)blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; for (int i = 1; i < nRanksPerNode; i++) { - DeviceHandle proxyChan = constProxyChans[blockIdx.x]; + DeviceHandle proxyChan = constProxyChans[blockIdx.x]; if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) { proxyChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), nElements * sizeof(int)); @@ -31,7 +31,7 @@ __device__ void localAlltoall(int rank, int nRanksPerNode, size_t nElements) { __global__ void __launch_bounds__(1024) alltoall0(int rank, size_t nElements) { int remoteRank = ((int)blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; - DeviceHandle proxyChan = constProxyChans[blockIdx.x]; + DeviceHandle proxyChan = constProxyChans[blockIdx.x]; if (threadIdx.x == 0) { proxyChan.putWithSignal(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), nElements * sizeof(int)); @@ -148,14 +148,14 @@ void AllToAllTestEngine::allocateBuffer() { } void AllToAllTestEngine::setupConnections() { - std::vector> proxyChannels; + std::vector> proxyChannels; setupMeshConnections(proxyChannels, sendBuff_.get(), args_.maxBytes, recvBuff_.get(), args_.maxBytes); - if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + sizeof(DeviceHandle) * proxyChannels.size())); } std::vector AllToAllTestEngine::getSendBuff() { return {sendBuff_.get()}; } diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index 899823f7d..c92312806 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -397,7 +397,7 @@ void BaseTestEngine::setupMeshConnectionsInternal( // Create mesh connections between all ranks. If recvBuff is nullptr, assume in-place. // TODO(saemal): retrun the actual vector instead of void -void BaseTestEngine::setupMeshConnections(std::vector>& proxyChannels, +void BaseTestEngine::setupMeshConnections(std::vector>& proxyChannels, void* inputBuff, size_t inputBuffBytes, void* outputBuff, size_t outputBuffBytes, SetupChannelFunc setupChannel) { mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc; @@ -419,9 +419,9 @@ void BaseTestEngine::setupMeshConnections(std::vector(chanService_); for (size_t i = 0; i < connections.size(); ++i) { - proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( - service->proxyChannel(service->buildAndAddSemaphore(*comm_, connections[i])), - service->addMemory(remoteRegMemories[i].get()), service->addMemory(inputBufRegMem)))); + proxyChannels.push_back(mscclpp::deviceHandle( + service->proxyChannel(service->buildAndAddSemaphore(*comm_, connections[i]), + service->addMemory(remoteRegMemories[i].get()), service->addMemory(inputBufRegMem)))); } } @@ -468,7 +468,7 @@ void BaseTestEngine::setupMeshConnections(std::vector& smCha } void BaseTestEngine::setupMeshConnections(std::vector& smChannels, - std::vector>& proxyChannels, + std::vector>& proxyChannels, void* inputBuff, size_t inputBuffBytes, void* putPacketBuff, size_t putPacketBuffBytes, void* getPacketBuff, size_t getPacketBuffBytes, void* outputBuff, size_t outputBuffBytes) { @@ -522,9 +522,9 @@ void BaseTestEngine::setupMeshConnections(std::vector& smCha if (putPacketBuff == nullptr || getPacketBuff == nullptr) { throw std::runtime_error("IB transport requires putPacketBuff and getPacketBuff"); } - proxyChannels.emplace_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( - service->proxyChannel(connIdToSemId[cid]), service->addMemory(remoteRegMemories[cid].get()), - service->addMemory(putPacketBufRegMem)))); + proxyChannels.emplace_back(mscclpp::deviceHandle( + service->proxyChannel(connIdToSemId[cid], service->addMemory(remoteRegMemories[cid].get()), + service->addMemory(putPacketBufRegMem)))); } } } diff --git a/test/mscclpp-test/common.hpp b/test/mscclpp-test/common.hpp index 7e3e8c423..d7408cc29 100644 --- a/test/mscclpp-test/common.hpp +++ b/test/mscclpp-test/common.hpp @@ -113,14 +113,14 @@ class BaseTestEngine { const mscclpp::RegisteredMemory&)>; template using DeviceHandle = mscclpp::DeviceHandle; - void setupMeshConnections(std::vector>& proxyChannels, void* inputBuff, + void setupMeshConnections(std::vector>& proxyChannels, void* inputBuff, size_t inputBuffBytes, void* outputBuff = nullptr, size_t outputBuffBytes = 0, SetupChannelFunc setupChannel = nullptr); void setupMeshConnections(std::vector& smChannels, void* inputBuff, size_t inputBuffBytes, void* outputBuff = nullptr, size_t outputBuffBytes = 0, ChannelSemantic semantic = ChannelSemantic::PUT, size_t nChannelPerConnection = 1); void setupMeshConnections(std::vector& smChannels, - std::vector>& proxyChannels, void* inputBuff, + std::vector>& proxyChannels, void* inputBuff, size_t inputBuffBytes, void* putPacketBuff = nullptr, size_t putPacketBuffBytes = 0, void* getPacketBuff = nullptr, size_t getPacketBuffBytes = 0, void* outputBuff = nullptr, size_t outputBuffBytes = 0); From 7a3dcb0627b9f5e2751ee80f99e308fc60235a27 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sat, 7 Dec 2024 08:57:45 -0800 Subject: [PATCH 2/2] Setup pipeline for mscclpp over nccl (#401) Setup pipeline for mscclpp over nccl Run `all_reduce_perf` via nccl API --- .azure-pipelines/nccl-api-test.yaml | 168 ++++++++++++++++++++++++++++ docker/base-dev-x.dockerfile | 1 + pyproject.toml | 2 +- test/deploy/deploy.sh | 13 ++- test/deploy/hostfile_ci | 1 + 5 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 .azure-pipelines/nccl-api-test.yaml create mode 100644 test/deploy/hostfile_ci diff --git a/.azure-pipelines/nccl-api-test.yaml b/.azure-pipelines/nccl-api-test.yaml new file mode 100644 index 000000000..e3d537fe4 --- /dev/null +++ b/.azure-pipelines/nccl-api-test.yaml @@ -0,0 +1,168 @@ +trigger: +- main + +pr: + branches: + include: + - main + drafts: false + +jobs: +- job: NcclTest + displayName: Run MSCCLPP over NCCL Test + strategy: + matrix: + cuda11: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda11.8 + cuda12: + containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.4 + pool: + name: msccl-ci + container: + image: $[ variables['containerImage'] ] + + steps: + - checkout: self + - checkout: git://One/msccl-users + - task: Bash@3 + name: Build + displayName: Build + inputs: + targetType: 'inline' + script: | + mkdir build && cd build + cmake -DCMAKE_BUILD_TYPE=Release -DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)/mscclpp' + + - task: DownloadSecureFile@1 + name: SshKeyFile + displayName: Download key file + inputs: + secureFile: mscclpp.pem + + - task: Bash@3 + name: InstallPackages + displayName: Install Packages + inputs: + targetType: 'inline' + script: | + sudo apt-get update -y + sudo apt-get install pssh -y + curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash + + - task: AzureCLI@2 + name: StartVMSS + displayName: Start VMSS + inputs: + azureSubscription: mscclpp-ci + scriptType: bash + scriptLocation: inlineScript + inlineScript: | + az vmss start --name mscclpp-ci --resource-group mscclpp + + - task: Bash@3 + name: DeployTestEnv + displayName: Deploy Test Env + inputs: + targetType: filePath + filePath: mscclpp/test/deploy/deploy.sh + arguments: "nccltest-single-node" + workingDirectory: $(System.DefaultWorkingDirectory)/mscclpp + + - task: Bash@3 + name: CopyMscclUsers + displayName: Copy msccl-users + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/msccl-users + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + DST_DIR="/tmp/mscclpp/msccl-users" + parallel-scp -t 0 -r -h ${HOSTFILE} -x "-i ${KeyFilePath}" -O $SSH_OPTION ${ROOT_DIR} ${DST_DIR} + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: InstallMscclTools + displayName: Install msccl-tools + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c " \ + cd /root/mscclpp; \ + git clone https://github.com/Azure/msccl-tools.git; \ + cd /root/mscclpp/msccl-tools; pip3 install ."' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: GenerateExecutionFile + displayName: Generate execution file + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\ + cd /root/mscclpp/msccl-users; \ + mkdir -p execution-files; \ + cd /root/mscclpp/msccl-users; \ + bash algos/mscclpp_a100/generate_execution_plan.sh"' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: InstallNcclTests + displayName: Install NCCL Tests + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c " \ + cd; git clone https://github.com/NVIDIA/nccl-tests.git; \ + cd nccl-tests; \ + MPI=1 MPI_HOME=/usr/local/mpi make -j"' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: RunNcclAllreduceTest + displayName: Run NCCL Allreduce Test + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\ + cd /root/mscclpp; \ + mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_reduce_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: AzureCLI@2 + name: StopVMSS + displayName: Deallocate VMSS + condition: always() + inputs: + azureSubscription: mscclpp-ci + scriptType: bash + scriptLocation: inlineScript + inlineScript: | + az vmss deallocate --name mscclpp-ci --resource-group mscclpp diff --git a/docker/base-dev-x.dockerfile b/docker/base-dev-x.dockerfile index 5aeaa4142..26216711e 100644 --- a/docker/base-dev-x.dockerfile +++ b/docker/base-dev-x.dockerfile @@ -28,6 +28,7 @@ ADD . /tmp/mscclpp WORKDIR /tmp/mscclpp ARG TARGET="cuda12.1" RUN target_type=$(echo $TARGET | sed 's/\.[0-9]*$//') && \ + python3 -m pip install --no-cache-dir --upgrade pip && \ python3 -m pip install --no-cache-dir -r python/requirements_${target_type}.txt # Set PATH diff --git a/pyproject.toml b/pyproject.toml index 99fcb4c17..b60ac4209 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ name = "mscclpp" version = "0.5.2" [tool.scikit-build] -cmake.minimum-version = "3.25.0" +cmake.version = ">=3.25.0" build-dir = "build/{wheel_tag}" wheel.packages = ["python/mscclpp", "python/mscclpp_benchmark"] wheel.install-dir = "mscclpp" diff --git a/test/deploy/deploy.sh b/test/deploy/deploy.sh index dee5af2d6..19d545ec6 100644 --- a/test/deploy/deploy.sh +++ b/test/deploy/deploy.sh @@ -1,9 +1,20 @@ set -e +# get parameter form $1 +TEST_NAME=$1 + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} ROOT_DIR="${SYSTEM_DEFAULTWORKINGDIRECTORY}/" +if [ "${TEST_NAME}" == "nccltest-single-node" ]; then + ROOT_DIR="${ROOT_DIR}/mscclpp" + SYSTEM_DEFAULTWORKINGDIRECTORY="${SYSTEM_DEFAULTWORKINGDIRECTORY}/mscclpp" +fi DST_DIR="/tmp/mscclpp" -HOSTFILE="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/deploy/hostfile" +if [ "${TEST_NAME}" == "nccltest-single-node" ]; then + HOSTFILE="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/deploy/hostfile_ci" +else + HOSTFILE="${SYSTEM_DEFAULTWORKINGDIRECTORY}/test/deploy/hostfile" +fi SSH_OPTION="StrictHostKeyChecking=no" chmod 400 ${KeyFilePath} diff --git a/test/deploy/hostfile_ci b/test/deploy/hostfile_ci new file mode 100644 index 000000000..bb2341705 --- /dev/null +++ b/test/deploy/hostfile_ci @@ -0,0 +1 @@ +azureuser@10.0.0.4 \ No newline at end of file