From d97f31274eee1c0d16c305f7af41ede16796fb8d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sun, 7 Apr 2024 11:48:45 +0000 Subject: [PATCH] WIP --- include/mscclpp/core.hpp | 4 + include/mscclpp/executor.hpp | 7 +- include/mscclpp/packet_device.hpp | 15 ++++ python/mscclpp/__init__.py | 1 + python/mscclpp/executor.cpp | 9 ++- src/executor/execution_kernel.cu | 25 ++++-- src/executor/executor.cc | 25 ++++-- src/include/execution_common.hpp | 4 + src/include/execution_kernel.hpp | 129 +++++++++++++++++++++++------- 9 files changed, 174 insertions(+), 45 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 50a922bc3..456020975 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -760,6 +760,10 @@ DeviceHandle> deviceHandle(T&& t) { return t.deviceHandle(); } +/// Packet value type. +template +using PacketValType = typename T::ValueType; + } // namespace mscclpp namespace std { diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index 21087a762..60a68fbb2 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -17,6 +17,11 @@ enum class DataType { FLOAT32, }; +enum class PacketType { + LL8, + LL16, +}; + class ExecutionPlan { public: ExecutionPlan(std::string planPath); @@ -37,7 +42,7 @@ class Executor { ~Executor(); void execute(int rank, void* sendbuff, void* recvBuff, size_t sendBuffSize, size_t recvBuffSize, DataType dataType, - int nthreads, const ExecutionPlan& plan, cudaStream_t stream); + int nthreads, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType = PacketType::LL16); private: struct Impl; diff --git a/include/mscclpp/packet_device.hpp b/include/mscclpp/packet_device.hpp index 11f63b53f..7678c81b5 100644 --- a/include/mscclpp/packet_device.hpp +++ b/include/mscclpp/packet_device.hpp @@ -24,12 +24,20 @@ union alignas(16) LL16Packet { uint32_t data2; uint32_t flag2; }; + using ValueType = uint2; #if defined(MSCCLPP_DEVICE_COMPILE) ulonglong2 raw_; MSCCLPP_DEVICE_INLINE LL16Packet() {} + MSCCLPP_DEVICE_INLINE LL16Packet(uint2 val, uint32_t flag) { + data1 = val.x; + flag1 = flag; + data2 = val.y; + flag2 = flag; + } + /// Write 8 bytes of data to the packet. /// @param val1 The first 4-byte data to write. /// @param val2 The second 4-byte data to write. @@ -95,10 +103,17 @@ union alignas(8) LL8Packet { uint32_t flag; }; uint64_t raw_; + + using ValueType = uint32_t; #if defined(MSCCLPP_DEVICE_COMPILE) MSCCLPP_DEVICE_INLINE LL8Packet() {} + MSCCLPP_DEVICE_INLINE LL8Packet(uint32_t val, uint32_t flag) { + data = val; + flag = flag; + } + MSCCLPP_DEVICE_INLINE void write(uint32_t val, uint32_t flag) { #if defined(MSCCLPP_DEVICE_CUDA) asm volatile("st.volatile.global.v2.u32 [%0], {%1,%2};" ::"l"(&raw_), "r"(val), "r"(flag)); diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index 0c8f7eb3b..0acc55fc5 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -22,6 +22,7 @@ DataType, Executor, ExecutionPlan, + PacketType, version, is_nvls_supported, ) diff --git a/python/mscclpp/executor.cpp b/python/mscclpp/executor.cpp index 5276e3336..f57a4294b 100644 --- a/python/mscclpp/executor.cpp +++ b/python/mscclpp/executor.cpp @@ -18,6 +18,8 @@ void register_executor(nb::module_& m) { .value("float16", DataType::FLOAT16) .value("float32", DataType::FLOAT32); + nb::enum_(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16); + nb::class_(m, "ExecutionPlan").def(nb::init(), nb::arg("planPath")); nb::class_(m, "Executor") @@ -25,10 +27,11 @@ void register_executor(nb::module_& m) { .def( "execute", [](Executor* self, int rank, uintptr_t sendbuff, uintptr_t recvBuff, size_t sendBuffSize, size_t recvBuffSize, - DataType dataType, int nthreads, const ExecutionPlan& plan, uintptr_t stream) { + DataType dataType, int nthreads, const ExecutionPlan& plan, uintptr_t stream, PacketType packetType) { self->execute(rank, reinterpret_cast(sendbuff), reinterpret_cast(recvBuff), sendBuffSize, - recvBuffSize, dataType, nthreads, plan, (cudaStream_t)stream); + recvBuffSize, dataType, nthreads, plan, (cudaStream_t)stream, packetType); }, nb::arg("rank"), nb::arg("sendbuff"), nb::arg("recvBuff"), nb::arg("sendBuffSize"), nb::arg("recvBuffSize"), - nb::arg("dataType"), nb::arg("nthreads"), nb::arg("plan"), nb::arg("stream")); + nb::arg("dataType"), nb::arg("nthreads"), nb::arg("plan"), nb::arg("stream"), + nb::arg("packetType") = PacketType::LL16); } diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index d5e07a3da..7aca5b1ed 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -5,27 +5,36 @@ #if defined(MSCCLPP_DEVICE_CUDA) namespace mscclpp { + +template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, - cudaStream_t stream) { + cudaStream_t stream, uint32_t flag) { switch (dataType) { case DataType::INT32: - executionKernel<<>>(rank, (int32_t*)src, (int32_t*)dst, - (int32_t*)scratch, plan); + executionKernel<<>>( + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, plan, flag); break; case DataType::UINT32: executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan); + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan, flag); break; case DataType::FLOAT16: - executionKernel - <<>>(rank, (half*)src, (half*)dst, (half*)scratch, plan); + executionKernel<<>>(rank, (half*)src, (half*)dst, + (half*)scratch, plan, flag); break; case DataType::FLOAT32: - executionKernel - <<>>(rank, (float*)src, (float*)dst, (float*)scratch, plan); + executionKernel<<>>(rank, (float*)src, (float*)dst, + (float*)scratch, plan, flag); break; } } + +template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, + void* scratch, DataType dataType, DeviceExecutionPlan* plan, + size_t sharedMemSize, cudaStream_t stream, uint32_t flag); +template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, + void* scratch, DataType dataType, DeviceExecutionPlan* plan, + size_t sharedMemSize, cudaStream_t stream, uint32_t flag); } // namespace mscclpp #endif diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 1d6d9305e..d775cd593 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -258,12 +258,24 @@ struct Executor::Impl { } void launchKernel(ExecutionContext& context, int rank, int nthreadsPerBlock, void* sendbuff, void* recvbuff, - DataType dataType, cudaStream_t stream) { + DataType dataType, cudaStream_t stream, PacketType packetType) { + static uint32_t flag = 0; int nthreadblocks = context.deviceExecutionPlans.size(); size_t sharedMemSize = sizeof(DeviceExecutionPlan); - ExecutionKernel::launchKernel( - rank, nthreadblocks, nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), dataType, - (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream); + switch (packetType) { + case PacketType::LL16: + ExecutionKernel::launchKernel( + rank, nthreadblocks, nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); + break; + case PacketType::LL8: + ExecutionKernel::launchKernel( + rank, nthreadblocks, nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); + break; + default: + throw std::runtime_error("Invalid packet type"); + } } }; @@ -271,10 +283,11 @@ Executor::Executor(std::shared_ptr comm, int nranksPerNode) : impl_(std::make_unique(comm, nranksPerNode)) {} void Executor::execute(int rank, void* sendbuff, void* recvBuff, size_t sendBuffSize, size_t recvBuffSize, - DataType dataType, int nthreads, const ExecutionPlan& plan, cudaStream_t stream) { + DataType dataType, int nthreads, const ExecutionPlan& plan, cudaStream_t stream, + PacketType packetType) { ExecutionContext context = this->impl_->setupExecutionContext(rank, sendbuff, recvBuff, sendBuffSize, recvBuffSize, plan, stream); - this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvBuff, dataType, stream); + this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvBuff, dataType, stream, packetType); } Executor::~Executor() = default; diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index 59d341612..5a63859b8 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -27,13 +27,17 @@ enum class ChannelType : uint8_t { enum class OperationType : uint8_t { BARRIER, PUT, + PUT_PACKET, GET, COPY, + COPY_PACKET, SIGNAL, WAIT, FLUSH, REDUCE, + REDUCE_PACKET, REDUCE_SEND, + REDUCE_SEND_PACKET, READ_REDUCE_COPY, READ_REDUCE_COPY_SEND, }; diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 833130fc9..023ca1a15 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -5,6 +5,7 @@ #define MSCCLPP_EXECUTION_KERNEL_HPP_ #include +#include #include #include @@ -102,7 +103,7 @@ MSCCLPP_DEVICE_INLINE uint32_t add_vectors(uint32_t a, uint32_t b) { } template <> -MSCCLPP_DEVICE_INLINE __attribute__((unused)) uint32_t add_vectors<__half>(uint32_t a, uint32_t b) { +MSCCLPP_DEVICE_INLINE uint32_t add_vectors<__half>(uint32_t a, uint32_t b) { return add_vectors_helper<__half2>(a, b); } @@ -112,6 +113,7 @@ MSCCLPP_DEVICE_INLINE __attribute__((unused)) uint32_t add_vectors<__half>(uint3 namespace mscclpp { #if defined(MSCCLPP_DEVICE_COMPILE) + template MSCCLPP_DEVICE_INLINE T* getBuffer(T* input, T* output, T* scratch, BufferType bufferType) { if (bufferType == BufferType::INPUT) { @@ -158,11 +160,11 @@ MSCCLPP_DEVICE_INLINE void handleGet(DeviceHandle& smChannel, uint32_ } template -MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* input, uint32_t inputOffsetByBytes, T* output, - uint32_t outputOffsetByBytes, DeviceHandle* smChannels, - uint8_t* srcChannelIndexes, uint8_t* dstChannelIndexes, - uint32_t* srcOffsets, uint32_t* dstOffsets, int nSrcChannels, - int nDstChannels, uint32_t size, bool sendToRemote = true) { +MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* output, uint32_t outputOffsetByBytes, T* input, + uint32_t inputOffsetByBytes, DeviceHandle* smChannels, + uint8_t* dstChannelIndexes, uint8_t* srcChannelIndexes, + uint32_t* dstOffsets, uint32_t* srcOffsets, int nDstChannels, + int nSrcChannels, uint32_t size, bool sendToRemote = true) { const size_t nInt4 = size / sizeof(int4); const size_t inputOffset4 = inputOffsetByBytes / sizeof(int4); const size_t outputOffset4 = outputOffsetByBytes / sizeof(int4); @@ -204,9 +206,59 @@ MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* input, uint32_t inputOffs } } -template +template +MSCCLPP_DEVICE_INLINE void handlePutPacket(uint32_t inputOffsetByBytes, DeviceHandle* smChannels, + uint8_t* dstChannelIndexes, uint32_t* dstOffsets, int nDstChannels, + uint32_t size, uint32_t flag) { + for (int index = 0; index < nDstChannels; ++index) { + smChannels[dstChannelIndexes[index]].putPackets(dstOffsets[index], inputOffsetByBytes, size, + threadIdx.x, blockDim.x, flag); + } +} + +template +MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* output, uint32_t outputOffsetByBytes, T* input, + uint32_t inputOffsetByBytes, DeviceHandle* smChannels, + uint8_t* dstChannelIndexes, uint32_t* dstOffsets, + uint32_t* srcOffsets, int nDstChannels, int nSrcs, size_t size, + uint32_t flag) { + size_t nPackets = size * 2 / sizeof(PacketType); + uint32_t srcOffset = inputOffsetByBytes / sizeof(PacketValType); + uint32_t dstOffset = outputOffsetByBytes / sizeof(PacketValType); + PacketValType* src = (PacketValType*)input + srcOffset; + PacketValType* dst = (PacketValType*)output + dstOffset; + for (int idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { + PacketValType data = {}; + for (int index = 0; index < nSrcs; ++index) { + PacketType* pkt = (PacketType*)input + srcOffsets[index] / sizeof(PacketType); + PacketValType val = pkt[idx].read(flag); + data = add_vectors(data, val); + } + data = add_vectors(data, src[idx]); + dst[idx] = data; + + PacketType pkt(data, flag); + for (int index = 0; index < nDstChannels; ++index) { + smChannels[dstChannelIndexes[index]].write(dstOffsets[index] / sizeof(PacketValType) + idx, pkt); + } + } +} + +template +MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, uint32_t dstOffset, uint32_t srcOffset, size_t size, + uint32_t flag) { + PacketType* srcPackets = (PacketType*)src; + PacketValType* result = (PacketValType*)dst; + size_t nPackets = size * 2 / sizeof(PacketType); + for (size_t idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { + PacketValType data = srcPackets[idx].read(flag); + result[idx] = data; + } +} + +template __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* input, T* output, T* scratch, - DeviceExecutionPlan* plan) { + DeviceExecutionPlan* plan, uint32_t flag) { extern __shared__ int sharedMem[]; int bid = blockIdx.x; int tid = threadIdx.x; @@ -242,20 +294,39 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu operations[i].dstOffset, operations[i].size); break; case OperationType::READ_REDUCE_COPY_SEND: - src = getBuffer(input, output, scratch, operations[i].srcBufferType); dst = getBuffer(input, output, scratch, operations[i].dstBufferType); - handleReadReduceCopySend(src, operations[i].srcOffset, dst, operations[i].dstOffset, smChannels, - operations[i].inputChannelIndexes, operations[i].outputChannelIndexes, - operations[i].inputOffsets, operations[i].outputOffsets, operations[i].nInputChannels, - operations[i].nOutputChannels, operations[i].size); + src = getBuffer(input, output, scratch, operations[i].srcBufferType); + handleReadReduceCopySend(dst, operations[i].dstOffset, src, operations[i].srcOffset, smChannels, + operations[i].outputChannelIndexes, operations[i].inputChannelIndexes, + operations[i].outputOffsets, operations[i].inputOffsets, operations[i].nOutputChannels, + operations[i].nInputChannels, operations[i].size); break; case OperationType::READ_REDUCE_COPY: + dst = getBuffer(input, output, scratch, operations[i].dstBufferType); src = getBuffer(input, output, scratch, operations[i].srcBufferType); + handleReadReduceCopySend(dst, operations[i].dstOffset, src, operations[i].srcOffset, smChannels, + operations[i].outputChannelIndexes, operations[i].inputChannelIndexes, + operations[i].outputOffsets, operations[i].inputOffsets, operations[i].nOutputChannels, + operations[i].nInputChannels, operations[i].size, false); + break; + case OperationType::PUT_PACKET: + handlePutPacket(operations[i].srcOffset, smChannels, operations[i].outputChannelIndexes, + operations[i].outputOffsets, operations[i].nOutputChannels, operations[i].size, + flag); + break; + case OperationType::REDUCE_SEND_PACKET: dst = getBuffer(input, output, scratch, operations[i].dstBufferType); - handleReadReduceCopySend(src, operations[i].srcOffset, dst, operations[i].dstOffset, smChannels, - operations[i].inputChannelIndexes, operations[i].outputChannelIndexes, - operations[i].inputOffsets, operations[i].outputOffsets, operations[i].nInputChannels, - operations[i].nOutputChannels, operations[i].size, false); + src = getBuffer(input, output, scratch, operations[i].srcBufferType); + handleReduceSendPacket(dst, operations[i].dstOffset, src, operations[i].srcOffset, smChannels, + operations[i].outputChannelIndexes, operations[i].outputOffsets, + operations[i].inputOffsets, operations[i].nOutputChannels, + operations[i].nInputChannels, operations[i].size, flag); + break; + case OperationType::COPY_PACKET: + dst = getBuffer(input, output, scratch, operations[i].dstBufferType); + src = getBuffer(input, output, scratch, operations[i].srcBufferType); + handleCopyPacket(dst, src, operations[i].dstOffset, operations[i].srcOffset, operations[i].size, + flag); break; default: break; @@ -267,30 +338,34 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu class ExecutionKernel { public: #if defined(MSCCLPP_DEVICE_HIP) + template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, - DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream) { + DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, + uint32_t flag = 0) { switch (dataType) { case DataType::INT32: - executionKernel<<>>(rank, (int32_t*)src, (int32_t*)dst, - (int32_t*)scratch, plan); + executionKernel<<>>( + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, plan, flag); break; case DataType::UINT32: - executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan); + executionKernel<<>>( + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan, flag); break; case DataType::FLOAT16: - executionKernel - <<>>(rank, (half*)src, (half*)dst, (half*)scratch, plan); + executionKernel<<>>( + rank, (half*)src, (half*)dst, (half*)scratch, plan, flag); break; case DataType::FLOAT32: - executionKernel - <<>>(rank, (float*)src, (float*)dst, (float*)scratch, plan); + executionKernel<<>>( + rank, (float*)src, (float*)dst, (float*)scratch, plan, flag); break; } } #else // !defined(MSCCLPP_DEVICE_HIP) + template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, - DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream); + DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, + uint32_t flag = 0); #endif // !defined(MSCCLPP_DEVICE_HIP) }; } // namespace mscclpp