From 223603e5c1b62dfa1f75a0b09cc9bd4d1c4bcde8 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 24 Aug 2023 03:37:49 +0000 Subject: [PATCH 1/9] perf improvement --- include/mscclpp/semaphore_device.hpp | 5 +++++ include/mscclpp/sm_channel_device.hpp | 2 ++ test/mscclpp-test/allgather_test.cu | 2 +- test/mscclpp-test/allreduce_test.cu | 20 +++++++++++--------- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index 292a8b495..87aa67f4b 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -44,6 +44,11 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { *remoteInboundSemaphoreId = semaphoreGetLocal(); } + __forceinline__ __device__ void signalWithoutFence() { + semaphoreIncrement(); + *remoteInboundSemaphoreId = semaphoreGetLocal(); + } + /// Signal the remote device for copied packets. /// /// Unlike @ref signal(), this function provides no guarantee on the completion of memory operations. This is diff --git a/include/mscclpp/sm_channel_device.hpp b/include/mscclpp/sm_channel_device.hpp index 353183835..090fd7f77 100644 --- a/include/mscclpp/sm_channel_device.hpp +++ b/include/mscclpp/sm_channel_device.hpp @@ -312,6 +312,8 @@ struct SmChannelDeviceHandle { /// __forceinline__ __device__ void signal() { semaphore_.signal(); } + __forceinline__ __device__ void signalWithoutFence() { semaphore_.signalWithoutFence(); } + /// Signal the remote semaphore for copied packets. /// /// Unlike @ref signal(), this function provides no guarantee on the completion of memory operations. This is diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index 387c1f956..bb84584b0 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -105,7 +105,7 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk sizeForThisBlock += lastChunkSize; } if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { - constSmChans[peerIdx].signal(); + constSmChans[peerIdx].signalWithoutFence(); constSmChans[peerIdx].wait(); } deviceSyncer.sync(nBlocks); diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 12fee9285..b3c85eb14 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -75,6 +75,7 @@ __device__ void vectorSumSingleBlock(int* dst, int* src, size_t nElem) { } __device__ mscclpp::DeviceSyncer deviceSyncer; +__device__ mscclpp::DeviceSyncer allGatherDeviceSyncer; __device__ mscclpp::DeviceSyncer reduceScatterDeviceSyncer; __device__ mscclpp::DeviceSyncer ibDeviceSyncer; @@ -291,7 +292,7 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { - smChans[peerIdx].signal(); + smChans[peerIdx].signalWithoutFence(); } for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { smChans[peerIdx].wait(); @@ -303,7 +304,7 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4 sum = make_int4(0, 0, 0, 0); for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { - int4 val = smChans[peerIdx].read(indexOffset4 + idx); + int4 val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset4 + idx); sum.w += val.w; sum.x += val.x; sum.y += val.y; @@ -319,7 +320,7 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nLastInts; idx += blockDim.x * nBlocks) { int sum = 0; for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { - int val = smChans[peerIdx].read(indexOffset + nInt4 * 4 + idx); + int val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset + nInt4 * 4 + idx); sum += val; } buff[indexOffset + nInt4 * 4 + idx] += sum; @@ -351,7 +352,7 @@ __device__ void reduceScatterSm(int* buff, int* scratch, int rank, int nRanksPer int nBlocksRemain = gridDim.x - nBlocksForReduceScatter; DeviceHandle& proxyChan = constDevFstRoundChans[peer]; if (peerNodeId == rank / nRanksPerNode) { - localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, nBlocksForReduceScatter); + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, chunkSize, chunkSize, gridDim.x); return; } @@ -445,10 +446,10 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk sizeForThisBlock += lastChunkSize; } if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { - constSmInPlaceChans[peerIdx].signal(); + constSmInPlaceChans[peerIdx].signalWithoutFence(); constSmInPlaceChans[peerIdx].wait(); } - deviceSyncer.sync(nBlocks); + allGatherDeviceSyncer.sync(nBlocks); size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; constSmInPlaceChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x); } @@ -470,7 +471,7 @@ __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t n int peerNodeId = peerRank / nRanksPerNode; int peer = (peerRank < rank) ? peerRank : peerRank - 1; DeviceHandle& proxyChan = constDevSndRoundChans[peer]; - const size_t nBlocksForLocalAllGather = gridDim.x; + const size_t nBlocksForLocalAllGather = gridDim.x / (nRanksPerNode - 1) * (nRanksPerNode - 1); const size_t rankChunkSize = nelemsPerGPU * sizeof(int); const int startRankIndexInLocalNode = (rank / nRanksPerNode) * nRanksPerNode; const int startRankIndexInPeerNode = (peerRank / nRanksPerNode) * nRanksPerNode; @@ -495,7 +496,7 @@ __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t n proxyChan.wait(); proxyChan.flush(); } - deviceSyncer.sync(nBlocksForLocalAllGather); + deviceSyncer.sync(gridDim.x); // Step 2 if (threadIdx.x == 0 && blockIdx.x == 0) { proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int) + step1Bytes, step2Bytes); @@ -506,7 +507,7 @@ __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t n proxyChan.wait(); proxyChan.flush(); } - deviceSyncer.sync(nBlocksForLocalAllGather); + deviceSyncer.sync(gridDim.x); // Step 3 localAllGatherSm(rank, nRanksPerNode, startRankIndexInPeerNode, step1Bytes, rankChunkSize, step2Bytes, nBlocksForLocalAllGather); @@ -882,6 +883,7 @@ void AllReduceTestColl::setupCollTest(size_t size) { mscclpp::DeviceSyncer syncer = {}; uint64_t initFlag = 1; CUDATHROW(cudaMemcpyToSymbol(deviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); + CUDATHROW(cudaMemcpyToSymbol(allGatherDeviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); CUDATHROW(cudaMemcpyToSymbol(reduceScatterDeviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); CUDATHROW(cudaMemcpyToSymbol(ibDeviceSyncer, &syncer, sizeof(mscclpp::DeviceSyncer))); CUDATHROW(cudaMemcpyToSymbol(globalFlag, &initFlag, sizeof(uint64_t))); From 6378cf0b411f7a479dd330da3b7336a6b1f72e85 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Sun, 27 Aug 2023 09:13:34 +0000 Subject: [PATCH 2/9] WIP --- include/mscclpp/concurrency.hpp | 26 ++++++++++ test/mscclpp-test/allreduce_test.cu | 79 +++++++++++++++++++---------- 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/include/mscclpp/concurrency.hpp b/include/mscclpp/concurrency.hpp index 8b6f59263..0b6a271d8 100644 --- a/include/mscclpp/concurrency.hpp +++ b/include/mscclpp/concurrency.hpp @@ -48,6 +48,32 @@ struct DeviceSyncer { // the flag is flipped. __syncthreads(); } + + __forceinline__ __device__ void syncWithoutFence(int blockNum) { + int maxOldCnt = blockNum - 1; + if (blockNum == 1) { + __syncthreads(); + return; + } + if (threadIdx.x == 0) { + int tmpIsAdd = isAdd_ ^ 1; + if (tmpIsAdd) { + if (atomicAdd(&count_, 1) == maxOldCnt) { + flag_ = 1; + } + POLL_MAYBE_JAILBREAK(!flag_, 1000000000); + } else { + if (atomicSub(&count_, 1) == 1) { + flag_ = 0; + } + POLL_MAYBE_JAILBREAK(flag_, 1000000000); + } + isAdd_ = tmpIsAdd; + } + // We need sync here because only a single thread is checking whether + // the flag is flipped. + __syncthreads(); + } #endif private: diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index b3c85eb14..61d4ab768 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -291,39 +291,34 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; - for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { - smChans[peerIdx].signalWithoutFence(); + int tid = threadIdx.x + blockIdx.x * blockDim.x; + if (tid < nPeer) { + smChans[tid].signalWithoutFence(); + smChans[tid].wait(); } - for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { - smChans[peerIdx].wait(); + if (tid == nBlocks * blockDim.x - 1) { + __threadfence_system(); } - reduceScatterDeviceSyncer.sync(nBlocks); + reduceScatterDeviceSyncer.syncWithoutFence(nBlocks); const size_t nInt4 = nelems / 4; - for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) { - int4 sum = make_int4(0, 0, 0, 0); - - for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { - int4 val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset4 + idx); - sum.w += val.w; - sum.x += val.x; - sum.y += val.y; - sum.z += val.z; + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + int4 val; + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) { + val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset4 + idx); + buff4[indexOffset4 + idx].w += val.w; + buff4[indexOffset4 + idx].x += val.x; + buff4[indexOffset4 + idx].y += val.y; + buff4[indexOffset4 + idx].z += val.z; } - buff4[indexOffset4 + idx].w += sum.w; - buff4[indexOffset4 + idx].x += sum.x; - buff4[indexOffset4 + idx].y += sum.y; - buff4[indexOffset4 + idx].z += sum.z; } const size_t nLastInts = nelems % 4; - for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nLastInts; idx += blockDim.x * nBlocks) { - int sum = 0; - for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nLastInts; idx += blockDim.x * nBlocks) { int val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset + nInt4 * 4 + idx); - sum += val; + buff[indexOffset + nInt4 * 4 + idx] += val; } - buff[indexOffset + nInt4 * 4 + idx] += sum; } } @@ -413,6 +408,26 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk uint64_t rankChunkSize, uint64_t size, size_t nBlocks) { if (nRanksPerNode == 1) return; if (blockIdx.x >= nBlocks) return; + + // int tid = threadIdx.x + blockIdx.x * blockDim.x; + // const int nPeer = nRanksPerNode - 1; + + // if (tid < nPeer) { + // constSmInPlaceChans[tid].signalWithoutFence(); + // constSmInPlaceChans[tid].wait(); + // } + // if (tid == nBlocks * blockDim.x - 1) { + // __threadfence_system(); + // } + // allGatherDeviceSyncer.syncWithoutFence(nBlocks); + // for (int i = 0; i < nPeer; ++i) { + // int peerIdx = (i + rank) % nPeer; + // const size_t rankLocalIndex = rank % nRanksPerNode; + // const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); + // size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; + // constSmInPlaceChans[peerIdx].get(offset, size, tid, blockDim.x * nBlocks); + // } + const size_t nPeer = nRanksPerNode - 1; const size_t peerIdx = blockIdx.x % nPeer; const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0); @@ -449,6 +464,10 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk constSmInPlaceChans[peerIdx].signalWithoutFence(); constSmInPlaceChans[peerIdx].wait(); } + int tid = threadIdx.x + blockIdx.x * blockDim.x; + if (tid == nBlocks * blockDim.x - 1) { + __threadfence_system(); + } allGatherDeviceSyncer.sync(nBlocks); size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; constSmInPlaceChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x); @@ -477,7 +496,7 @@ __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t n const int startRankIndexInPeerNode = (peerRank / nRanksPerNode) * nRanksPerNode; if (peerNodeId == rank / nRanksPerNode) { - localAllGatherSm(rank, nRanksPerNode, 0, 0, rankChunkSize, rankChunkSize, nBlocksForLocalAllGather); + localAllGatherSm(rank, nRanksPerNode, 0, 0, rankChunkSize, rankChunkSize, gridDim.x); return; } @@ -791,9 +810,13 @@ __global__ void __launch_bounds__(1024) __global__ void allreduce4(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, size_t nelems) { - reduceScatterSm(buff, scratch, rank, nRanksPerNode, worldSize, nelems); - deviceSyncer.sync(gridDim.x); - allGatherSm(rank, worldSize, nRanksPerNode, nelems / worldSize); + // reduceScatterSm(buff, scratch, rank, nRanksPerNode, worldSize, nelems); + // deviceSyncer.sync(gridDim.x); + // allGatherSm(rank, worldSize, nRanksPerNode, nelems / worldSize); + localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, nelems / worldSize, nelems / worldSize, gridDim.x); + deviceSyncer.syncWithoutFence(gridDim.x); + localAllGatherSm(rank, nRanksPerNode, 0, 0, nelems / worldSize * sizeof(int), nelems / worldSize * sizeof(int), + gridDim.x); } class AllReduceTestColl : public BaseTestColl { @@ -828,7 +851,7 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { tmpBuff = scratchBuff; nThreadsPerBlock = 1024; } else if (kernelNum == 4) { - nBlocks = 45; + nBlocks = 49; tmpBuff = scratchBuff; nThreadsPerBlock = 512; } else { From bf0c7356c51dd8a9807594a286f821a208034e6c Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 28 Aug 2023 02:50:53 +0000 Subject: [PATCH 3/9] update --- include/mscclpp/semaphore_device.hpp | 2 +- test/mscclpp-test/allreduce_test.cu | 87 +++++++++++++++++----------- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index 87aa67f4b..4463b505e 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -39,7 +39,7 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { __forceinline__ __device__ void signal() { // This fence ensures that preceding writes are visible on the peer GPU before the incremented // `outboundSemaphoreId` is visible. - __threadfence_system(); + asm volatile("fence.acq_rel.sys;" ::: "memory"); semaphoreIncrement(); *remoteInboundSemaphoreId = semaphoreGetLocal(); } diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 61d4ab768..1792ee54e 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -292,12 +292,16 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; int tid = threadIdx.x + blockIdx.x * blockDim.x; + if (tid == 0) { + asm volatile("fence.acq_rel.sys;" ::: "memory"); + } + __syncwarp(); if (tid < nPeer) { smChans[tid].signalWithoutFence(); - smChans[tid].wait(); } - if (tid == nBlocks * blockDim.x - 1) { - __threadfence_system(); + int waitStart = nBlocks * blockDim.x - nPeer; + if (tid >= waitStart && tid < nBlocks * blockDim.x) { + smChans[tid - waitStart].wait(); } reduceScatterDeviceSyncer.syncWithoutFence(nBlocks); @@ -409,25 +413,6 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk if (nRanksPerNode == 1) return; if (blockIdx.x >= nBlocks) return; - // int tid = threadIdx.x + blockIdx.x * blockDim.x; - // const int nPeer = nRanksPerNode - 1; - - // if (tid < nPeer) { - // constSmInPlaceChans[tid].signalWithoutFence(); - // constSmInPlaceChans[tid].wait(); - // } - // if (tid == nBlocks * blockDim.x - 1) { - // __threadfence_system(); - // } - // allGatherDeviceSyncer.syncWithoutFence(nBlocks); - // for (int i = 0; i < nPeer; ++i) { - // int peerIdx = (i + rank) % nPeer; - // const size_t rankLocalIndex = rank % nRanksPerNode; - // const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); - // size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; - // constSmInPlaceChans[peerIdx].get(offset, size, tid, blockDim.x * nBlocks); - // } - const size_t nPeer = nRanksPerNode - 1; const size_t peerIdx = blockIdx.x % nPeer; const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0); @@ -461,18 +446,43 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk sizeForThisBlock += lastChunkSize; } if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { - constSmInPlaceChans[peerIdx].signalWithoutFence(); + constSmInPlaceChans[peerIdx].signal(); constSmInPlaceChans[peerIdx].wait(); } - int tid = threadIdx.x + blockIdx.x * blockDim.x; - if (tid == nBlocks * blockDim.x - 1) { - __threadfence_system(); - } allGatherDeviceSyncer.sync(nBlocks); size_t offset = rankChunkSize * (startRankChunkIndex + remoteRankLocalIndex) + offsetInRankChunk; constSmInPlaceChans[peerIdx].get(offset + offsetForThisBlock, sizeForThisBlock, threadIdx.x, blockDim.x); } +__device__ void localRingAllGatherSm(int rank, int nRanksPerNode, uint64_t size, size_t nBlocks) { + if (nRanksPerNode == 1) return; + if (blockIdx.x >= nBlocks) return; + + int tid = threadIdx.x + blockIdx.x * blockDim.x; + const int nPeer = nRanksPerNode - 1; + + // if (tid == 0) { + // asm volatile("fence.acq_rel.sys;" ::: "memory"); + // } + // __syncwarp(); + if (tid < nPeer) { + constSmInPlaceChans[tid].signal(); + // constSmInPlaceChans[tid].wait(); + } + int waitStart = nBlocks * blockDim.x - nPeer; + if (tid >= waitStart && tid < nBlocks * blockDim.x) { + constSmInPlaceChans[tid - waitStart].wait(); + } + allGatherDeviceSyncer.sync(nBlocks); + for (int i = 0; i < nPeer; ++i) { + int peerIdx = (i + rank) % nPeer; + const size_t rankLocalIndex = rank % nRanksPerNode; + const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); + size_t offset = size * remoteRankLocalIndex; + constSmInPlaceChans[peerIdx].get(offset, size, tid, blockDim.x * nBlocks); + } +} + // This is an allgather4 equivalent __device__ void allGatherSm(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU) { // this allgather is a pipelined and hierarchical one and only works for two nodes @@ -810,13 +820,16 @@ __global__ void __launch_bounds__(1024) __global__ void allreduce4(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, size_t nelems) { - // reduceScatterSm(buff, scratch, rank, nRanksPerNode, worldSize, nelems); - // deviceSyncer.sync(gridDim.x); - // allGatherSm(rank, worldSize, nRanksPerNode, nelems / worldSize); + reduceScatterSm(buff, scratch, rank, nRanksPerNode, worldSize, nelems); + deviceSyncer.sync(gridDim.x); + allGatherSm(rank, worldSize, nRanksPerNode, nelems / worldSize); +} + +__global__ void allreduce5(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, + size_t nelems) { localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, nelems / worldSize, nelems / worldSize, gridDim.x); deviceSyncer.syncWithoutFence(gridDim.x); - localAllGatherSm(rank, nRanksPerNode, 0, 0, nelems / worldSize * sizeof(int), nelems / worldSize * sizeof(int), - gridDim.x); + localRingAllGatherSm(rank, nRanksPerNode, nelems / worldSize * sizeof(int), gridDim.x); } class AllReduceTestColl : public BaseTestColl { @@ -854,6 +867,10 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { nBlocks = 49; tmpBuff = scratchBuff; nThreadsPerBlock = 512; + } else if (kernelNum == 5) { + nBlocks = 49; + tmpBuff = scratchBuff; + nThreadsPerBlock = 512; } else { nBlocks = std::max(args.nRanksPerNode - 1, 1) * BLOCKS_PER_PEER; tmpBuff = scratchPacketBuff; @@ -874,6 +891,9 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { else if (kernelNum == 4) allreduce4<<>>((int*)inputBuff, (int*)tmpBuff, resultBuff, rank, args.nRanksPerNode, worldSize, paramCount_); + else if (kernelNum == 5) + allreduce5<<>>((int*)inputBuff, (int*)tmpBuff, resultBuff, rank, + args.nRanksPerNode, worldSize, paramCount_); } void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { @@ -924,7 +944,8 @@ std::vector AllReduceTestColl::getKernelRestrictions() { true, 3, .alignedBytes = 16 * worldSize_ /*use ulong2 to transfer data*/, - }}; + }, + {5, "allreduce3", true, 3, .alignedBytes = 4 * worldSize_}}; } class AllReduceTestEngine : public BaseTestEngine { From ebb6e82cad0909d031b260fccaedc0dcbeb1b66e Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Mon, 28 Aug 2023 09:25:32 +0000 Subject: [PATCH 4/9] update --- include/mscclpp/concurrency.hpp | 26 ------------------------- include/mscclpp/semaphore_device.hpp | 6 +----- include/mscclpp/sm_channel_device.hpp | 2 -- test/mscclpp-test/allgather_test.cu | 2 +- test/mscclpp-test/allreduce_test.cu | 28 +++++++++------------------ 5 files changed, 11 insertions(+), 53 deletions(-) diff --git a/include/mscclpp/concurrency.hpp b/include/mscclpp/concurrency.hpp index 0b6a271d8..8b6f59263 100644 --- a/include/mscclpp/concurrency.hpp +++ b/include/mscclpp/concurrency.hpp @@ -48,32 +48,6 @@ struct DeviceSyncer { // the flag is flipped. __syncthreads(); } - - __forceinline__ __device__ void syncWithoutFence(int blockNum) { - int maxOldCnt = blockNum - 1; - if (blockNum == 1) { - __syncthreads(); - return; - } - if (threadIdx.x == 0) { - int tmpIsAdd = isAdd_ ^ 1; - if (tmpIsAdd) { - if (atomicAdd(&count_, 1) == maxOldCnt) { - flag_ = 1; - } - POLL_MAYBE_JAILBREAK(!flag_, 1000000000); - } else { - if (atomicSub(&count_, 1) == 1) { - flag_ = 0; - } - POLL_MAYBE_JAILBREAK(flag_, 1000000000); - } - isAdd_ = tmpIsAdd; - } - // We need sync here because only a single thread is checking whether - // the flag is flipped. - __syncthreads(); - } #endif private: diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index 4463b505e..2f948baed 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -40,11 +40,7 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { // This fence ensures that preceding writes are visible on the peer GPU before the incremented // `outboundSemaphoreId` is visible. asm volatile("fence.acq_rel.sys;" ::: "memory"); - semaphoreIncrement(); - *remoteInboundSemaphoreId = semaphoreGetLocal(); - } - - __forceinline__ __device__ void signalWithoutFence() { + // __threadfence_system(); semaphoreIncrement(); *remoteInboundSemaphoreId = semaphoreGetLocal(); } diff --git a/include/mscclpp/sm_channel_device.hpp b/include/mscclpp/sm_channel_device.hpp index 090fd7f77..353183835 100644 --- a/include/mscclpp/sm_channel_device.hpp +++ b/include/mscclpp/sm_channel_device.hpp @@ -312,8 +312,6 @@ struct SmChannelDeviceHandle { /// __forceinline__ __device__ void signal() { semaphore_.signal(); } - __forceinline__ __device__ void signalWithoutFence() { semaphore_.signalWithoutFence(); } - /// Signal the remote semaphore for copied packets. /// /// Unlike @ref signal(), this function provides no guarantee on the completion of memory operations. This is diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index bb84584b0..387c1f956 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -105,7 +105,7 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk sizeForThisBlock += lastChunkSize; } if (threadIdx.x == 0 && peerLocalBlockIdx == 0) { - constSmChans[peerIdx].signalWithoutFence(); + constSmChans[peerIdx].signal(); constSmChans[peerIdx].wait(); } deviceSyncer.sync(nBlocks); diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 1792ee54e..ac059286d 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -292,24 +292,21 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; int tid = threadIdx.x + blockIdx.x * blockDim.x; - if (tid == 0) { - asm volatile("fence.acq_rel.sys;" ::: "memory"); - } - __syncwarp(); if (tid < nPeer) { - smChans[tid].signalWithoutFence(); + smChans[tid].signal(); } int waitStart = nBlocks * blockDim.x - nPeer; if (tid >= waitStart && tid < nBlocks * blockDim.x) { smChans[tid - waitStart].wait(); } - reduceScatterDeviceSyncer.syncWithoutFence(nBlocks); + reduceScatterDeviceSyncer.sync(nBlocks); const size_t nInt4 = nelems / 4; - for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + for (int index = 0; index < nPeer; ++index) { int4 val; + int peerIdx = (index + localRankIndexInNode) % nPeer; for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) { - val = smChans[(localRankIndexInNode + peerIdx) % nPeer].read(indexOffset4 + idx); + val = smChans[peerIdx].read(indexOffset4 + idx); buff4[indexOffset4 + idx].w += val.w; buff4[indexOffset4 + idx].x += val.x; buff4[indexOffset4 + idx].y += val.y; @@ -412,7 +409,6 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk uint64_t rankChunkSize, uint64_t size, size_t nBlocks) { if (nRanksPerNode == 1) return; if (blockIdx.x >= nBlocks) return; - const size_t nPeer = nRanksPerNode - 1; const size_t peerIdx = blockIdx.x % nPeer; const size_t nBlockForThisPeer = nBlocks / nPeer + (nBlocks % nPeer > peerIdx ? 1 : 0); @@ -461,13 +457,8 @@ __device__ void localRingAllGatherSm(int rank, int nRanksPerNode, uint64_t size, int tid = threadIdx.x + blockIdx.x * blockDim.x; const int nPeer = nRanksPerNode - 1; - // if (tid == 0) { - // asm volatile("fence.acq_rel.sys;" ::: "memory"); - // } - // __syncwarp(); if (tid < nPeer) { constSmInPlaceChans[tid].signal(); - // constSmInPlaceChans[tid].wait(); } int waitStart = nBlocks * blockDim.x - nPeer; if (tid >= waitStart && tid < nBlocks * blockDim.x) { @@ -476,8 +467,7 @@ __device__ void localRingAllGatherSm(int rank, int nRanksPerNode, uint64_t size, allGatherDeviceSyncer.sync(nBlocks); for (int i = 0; i < nPeer; ++i) { int peerIdx = (i + rank) % nPeer; - const size_t rankLocalIndex = rank % nRanksPerNode; - const int remoteRankLocalIndex = (peerIdx < rankLocalIndex ? peerIdx : peerIdx + 1); + const int remoteRankLocalIndex = (peerIdx < rank ? peerIdx : peerIdx + 1); size_t offset = size * remoteRankLocalIndex; constSmInPlaceChans[peerIdx].get(offset, size, tid, blockDim.x * nBlocks); } @@ -828,7 +818,7 @@ __global__ void allreduce4(int* buff, int* scratch, void* result, int rank, int __global__ void allreduce5(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, size_t nelems) { localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, nelems / worldSize, nelems / worldSize, gridDim.x); - deviceSyncer.syncWithoutFence(gridDim.x); + deviceSyncer.sync(gridDim.x); localRingAllGatherSm(rank, nRanksPerNode, nelems / worldSize * sizeof(int), gridDim.x); } @@ -868,9 +858,9 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { tmpBuff = scratchBuff; nThreadsPerBlock = 512; } else if (kernelNum == 5) { - nBlocks = 49; + nBlocks = 24; tmpBuff = scratchBuff; - nThreadsPerBlock = 512; + nThreadsPerBlock = 1024; } else { nBlocks = std::max(args.nRanksPerNode - 1, 1) * BLOCKS_PER_PEER; tmpBuff = scratchPacketBuff; From 8ba6eeb4042cb53a482b79ab3131ed1610f33dd5 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 29 Aug 2023 12:03:35 +0000 Subject: [PATCH 5/9] update --- test/mscclpp-test/allreduce_test.cu | 136 ++++++++++++++++++++++++---- test/mscclpp-test/common.cc | 3 + 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index ac059286d..7912e1acf 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -291,11 +291,11 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; - int tid = threadIdx.x + blockIdx.x * blockDim.x; + const int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < nPeer) { smChans[tid].signal(); } - int waitStart = nBlocks * blockDim.x - nPeer; + const int waitStart = nBlocks * blockDim.x - nPeer; if (tid >= waitStart && tid < nBlocks * blockDim.x) { smChans[tid - waitStart].wait(); } @@ -822,6 +822,73 @@ __global__ void allreduce5(int* buff, int* scratch, void* result, int rank, int localRingAllGatherSm(rank, nRanksPerNode, nelems / worldSize * sizeof(int), gridDim.x); } +__global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, int worldSize, size_t nelems) { + // This version of allreduce only works for single nodes + if (worldSize != nRanksPerNode) return; + const int nPeers = nRanksPerNode - 1; + const int nPkts = nelems / 2; + const int nelemsPerRank = nelems / worldSize; + const int nPktsPerRank = nelemsPerRank / 2; + // flag for packets. Initially 1 + const uint32_t flag = (uint32_t)globalFlag; + // thread block & channel info + const int nBlocksPerPeer = gridDim.x / nPeers; + const int localBlockIdx = blockIdx.x % nBlocksPerPeer; + const int peerIdx = blockIdx.x / nBlocksPerPeer; + DeviceHandle smChan = constSmOutOfPlaceChans[peerIdx]; + const int tid = threadIdx.x + localBlockIdx * blockDim.x; + // double buffering + size_t scratchBaseOffset = (flag & 1) ? 0 : nPkts * sizeof(mscclpp::LLPacket); + void* scratchBuff = (void*)((char*)scratch + scratchBaseOffset); + size_t scratchOffset = scratchBaseOffset + rank * nPktsPerRank * sizeof(mscclpp::LLPacket); + size_t scratchResultOffset = + (flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket); + size_t srcOffset = rank * nelemsPerRank * sizeof(int); + uint2* src = (uint2*)((char*)buff + srcOffset); + mscclpp::LLPacket* scratchResult = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset); + + // step 1: write to scratch buffer + smChan.putPackets(scratchOffset, srcOffset, nelemsPerRank * sizeof(int), tid, blockDim.x * nBlocksPerPeer, flag); + // deviceSyncer.sync(gridDim.x); + // if (threadIdx.x == 0) { + // smChan.signal(); + // smChan.wait(); + // } + // deviceSyncer.sync(gridDim.x); + // if (rank == 1 && threadIdx.x == 0 && blockIdx.x == 0) { + // for (int i = 0; i < nelems; ++i) { + // printf("scratch data index %d value %d flag %d\n", i, ((int*)scratchBuff)[2*i], ((int*)scratchBuff)[2*i+1]); + // } + // } + // step 2: get data from scratch buffer and reduce + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * gridDim.x) { + uint2 data = make_uint2(0, 0); + uint2 val; + for (int index = 0; index < nPeers; index++) { + const int remoteRank = index < rank ? index : index + 1; + mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + remoteRank * nPktsPerRank; + val = dstPkt[idx].read(flag); + data.x += val.x; + data.y += val.y; + } + src[idx].x += data.x; + src[idx].y += data.y; + scratchResult[idx].write(data.x, data.y, flag); + } + // step 3: write result to a temp buffer and get data result from peer + mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)smChan.dst_ + scratchResultOffset); + const int remoteRank = peerIdx < rank ? peerIdx : peerIdx + 1; + uint2* result = (uint2*)((char*)buff + remoteRank * nelemsPerRank * sizeof(int)); + for (int idx = threadIdx.x + localBlockIdx * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * nBlocksPerPeer) { + uint2 data = dstPkt[idx].read(flag); + result[idx].x = data.x; + result[idx].y = data.y; + } + if (threadIdx.x == 0 && blockIdx.x == 0) { + globalFlag += 1; + } +} + class AllReduceTestColl : public BaseTestColl { public: AllReduceTestColl() = default; @@ -861,6 +928,10 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { nBlocks = 24; tmpBuff = scratchBuff; nThreadsPerBlock = 1024; + } else if (kernelNum == 6) { + nBlocks = 21; + tmpBuff = scratchPacketBuff; + nThreadsPerBlock = 512; } else { nBlocks = std::max(args.nRanksPerNode - 1, 1) * BLOCKS_PER_PEER; tmpBuff = scratchPacketBuff; @@ -884,6 +955,10 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { else if (kernelNum == 5) allreduce5<<>>((int*)inputBuff, (int*)tmpBuff, resultBuff, rank, args.nRanksPerNode, worldSize, paramCount_); + else if (kernelNum == 6) { + allreduce6<<>>((int*)inputBuff, (int*)tmpBuff, rank, args.nRanksPerNode, + worldSize, paramCount_); + } } void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { @@ -896,6 +971,9 @@ void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBu for (size_t i = 0; i < recvCount_; i++) { dataHost[i] = worldSize * (worldSize - 1) / 2; } + // if (args.rank == 1) { + // printf("data is %d\n", dataHost[0]); + // } std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); } @@ -935,7 +1013,8 @@ std::vector AllReduceTestColl::getKernelRestrictions() { 3, .alignedBytes = 16 * worldSize_ /*use ulong2 to transfer data*/, }, - {5, "allreduce3", true, 3, .alignedBytes = 4 * worldSize_}}; + {5, "allreduce5", false, 1, .alignedBytes = 4 * worldSize_}, + {6, "allreduce6", false, 1, .alignedBytes = 4 * worldSize_}}; } class AllReduceTestEngine : public BaseTestEngine { @@ -972,7 +1051,7 @@ AllReduceTestEngine::AllReduceTestEngine(const TestArgs& args) : BaseTestEngine( inPlace_ = isInPlace(); } -bool AllReduceTestEngine::isUsePacket() const { return (args_.kernelNum == 2); } +bool AllReduceTestEngine::isUsePacket() const { return (args_.kernelNum == 2 || args_.kernelNum == 6); } bool AllReduceTestEngine::isInPlace() const { return (args_.kernelNum != 2); } @@ -996,6 +1075,12 @@ void AllReduceTestEngine::allocateBuffer() { getPacketBuff_ = mscclpp::allocSharedCuda(packetBuffNelem); putPacketBuff = putPacketBuff_.get(); getPacketBuff = getPacketBuff_.get(); + } else if (args_.kernelNum == 6) { + const size_t nPacket = (args_.maxBytes + sizeof(uint64_t) - 1) / sizeof(uint64_t); + // 2x for double-buffering, scratchBuff used to store original data and reduced results + const size_t scratchBuffNelem = nPacket * 2 /*original data & reduced result */ * 2 /* double buffering*/; + scratchPacketBuff_ = mscclpp::allocSharedCuda(scratchBuffNelem); + scratchPacketBuff = scratchPacketBuff_.get(); } expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); @@ -1011,22 +1096,33 @@ void AllReduceTestEngine::setupConnections() { std::vector> proxyChannels; const size_t nPacket = (args_.maxBytes + sizeof(uint64_t) - 1) / sizeof(uint64_t); - const size_t scratchPacketBuffBytes = - nPacket * std::max(args_.nRanksPerNode - 1, 1) * 2 * sizeof(mscclpp::LLPacket); - const size_t packetBuffBytes = nPacket * 2 * sizeof(mscclpp::LLPacket); - setupMeshConnections(smOutOfPlaceChannels_, proxyChannels, inputBuff_.get(), args_.maxBytes, putPacketBuff_.get(), - packetBuffBytes, getPacketBuff_.get(), packetBuffBytes, scratchPacketBuff_.get(), - scratchPacketBuffBytes); - - assert(smOutOfPlaceChannels_.size() < sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)); - assert(proxyChannels.size() < sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)); - - std::vector> smChannelDeviceHandles(smOutOfPlaceChannels_.size()); - getChannelDeviceHandle(smOutOfPlaceChannels_, smChannelDeviceHandles); - CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceChans, smChannelDeviceHandles.data(), - sizeof(DeviceHandle) * smChannelDeviceHandles.size())); - CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, proxyChannels.data(), - sizeof(DeviceHandle) * proxyChannels.size())); + if (args_.kernelNum == 6) { + const size_t scratchPacketBuffBytes = nPacket * 2 * 2 * sizeof(mscclpp::LLPacket); + setupMeshConnections(smOutOfPlaceChannels_, inputBuff_.get(), args_.maxBytes, scratchPacketBuff_.get(), + scratchPacketBuffBytes); + std::vector> smChannelDeviceHandles(smOutOfPlaceChannels_.size()); + getChannelDeviceHandle(smOutOfPlaceChannels_, smChannelDeviceHandles); + CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceChans, smChannelDeviceHandles.data(), + sizeof(DeviceHandle) * smChannelDeviceHandles.size())); + } + if (args_.kernelNum == 2) { + const size_t scratchPacketBuffBytes = + nPacket * std::max(args_.nRanksPerNode - 1, 1) * 2 * sizeof(mscclpp::LLPacket); + const size_t packetBuffBytes = nPacket * 2 * sizeof(mscclpp::LLPacket); + setupMeshConnections(smOutOfPlaceChannels_, proxyChannels, inputBuff_.get(), args_.maxBytes, putPacketBuff_.get(), + packetBuffBytes, getPacketBuff_.get(), packetBuffBytes, scratchPacketBuff_.get(), + scratchPacketBuffBytes); + + assert(smOutOfPlaceChannels_.size() < sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)); + assert(proxyChannels.size() < sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)); + + std::vector> smChannelDeviceHandles(smOutOfPlaceChannels_.size()); + getChannelDeviceHandle(smOutOfPlaceChannels_, smChannelDeviceHandles); + CUDATHROW(cudaMemcpyToSymbol(constSmOutOfPlaceChans, smChannelDeviceHandles.data(), + sizeof(DeviceHandle) * smChannelDeviceHandles.size())); + CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, proxyChannels.data(), + sizeof(DeviceHandle) * proxyChannels.size())); + } } else { std::vector> fstRoundChannels; std::vector> sndRoundChannels; diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index e80531048..318590099 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -351,6 +351,9 @@ size_t BaseTestEngine::checkData() { CUDATHROW(cudaMemcpy(recvData.data(), recvBuff, recvBytes, cudaMemcpyDeviceToHost)); for (size_t i = 0; i < recvData.size(); i++) { if (recvData[i] != ((int*)expectedBuff)[i]) { + if (this->args_.rank == 1) + std::cout << "ERROR: recvData[" << i << "]=" << recvData[i] << " != expectedBuff[" << i + << "]=" << ((int*)expectedBuff)[i] << std::endl; nErrors++; } } From dd67c020001470c85980ce7481fe91b6996611ac Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 29 Aug 2023 14:20:12 +0000 Subject: [PATCH 6/9] update --- test/mscclpp-test/allreduce_test.cu | 34 ++++++++++++----------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 7912e1acf..cdf1009b8 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -845,22 +845,10 @@ __global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, (flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket); size_t srcOffset = rank * nelemsPerRank * sizeof(int); uint2* src = (uint2*)((char*)buff + srcOffset); - mscclpp::LLPacket* scratchResult = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset); // step 1: write to scratch buffer smChan.putPackets(scratchOffset, srcOffset, nelemsPerRank * sizeof(int), tid, blockDim.x * nBlocksPerPeer, flag); - // deviceSyncer.sync(gridDim.x); - // if (threadIdx.x == 0) { - // smChan.signal(); - // smChan.wait(); - // } - // deviceSyncer.sync(gridDim.x); - // if (rank == 1 && threadIdx.x == 0 && blockIdx.x == 0) { - // for (int i = 0; i < nelems; ++i) { - // printf("scratch data index %d value %d flag %d\n", i, ((int*)scratchBuff)[2*i], ((int*)scratchBuff)[2*i+1]); - // } - // } - // step 2: get data from scratch buffer and reduce + // step 2: get data from scratch buffer, reduce data and write result to remote scratch buffer for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * gridDim.x) { uint2 data = make_uint2(0, 0); uint2 val; @@ -871,16 +859,22 @@ __global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, data.x += val.x; data.y += val.y; } - src[idx].x += data.x; - src[idx].y += data.y; - scratchResult[idx].write(data.x, data.y, flag); + data.x += src[idx].x; + data.y += src[idx].y; + src[idx].x = data.x; + src[idx].y = data.y; + for (int index = 0; index < nPeers; index++) { + mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)constSmOutOfPlaceChans[index].dst_ + scratchResultOffset); + dstPkt[idx + rank * nPktsPerRank].write(data.x, data.y, flag); + } } - // step 3: write result to a temp buffer and get data result from peer - mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)smChan.dst_ + scratchResultOffset); + // step 3: get data result from scratch buffer const int remoteRank = peerIdx < rank ? peerIdx : peerIdx + 1; + mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset); + const int dstOffset = remoteRank * nPktsPerRank; uint2* result = (uint2*)((char*)buff + remoteRank * nelemsPerRank * sizeof(int)); for (int idx = threadIdx.x + localBlockIdx * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * nBlocksPerPeer) { - uint2 data = dstPkt[idx].read(flag); + uint2 data = dstPkt[idx + dstOffset].read(flag); result[idx].x = data.x; result[idx].y = data.y; } @@ -929,7 +923,7 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { tmpBuff = scratchBuff; nThreadsPerBlock = 1024; } else if (kernelNum == 6) { - nBlocks = 21; + nBlocks = 28; tmpBuff = scratchPacketBuff; nThreadsPerBlock = 512; } else { From 55585942f56b9f5a0814bcd6c66e362727c344b1 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Wed, 30 Aug 2023 03:14:52 +0000 Subject: [PATCH 7/9] update --- test/mscclpp-test/allreduce_test.cu | 21 +++++++++++---------- test/mscclpp-test/common.cc | 3 --- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index cdf1009b8..09bc9457f 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -822,7 +822,8 @@ __global__ void allreduce5(int* buff, int* scratch, void* result, int rank, int localRingAllGatherSm(rank, nRanksPerNode, nelems / worldSize * sizeof(int), gridDim.x); } -__global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, int worldSize, size_t nelems) { +__global__ void allreduce6(int* buff, int* scratch, void* resultBuff, int rank, int nRanksPerNode, int worldSize, + size_t nelems) { // This version of allreduce only works for single nodes if (worldSize != nRanksPerNode) return; const int nPeers = nRanksPerNode - 1; @@ -845,24 +846,24 @@ __global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, (flag & 1) ? 2 * nPkts * sizeof(mscclpp::LLPacket) : 3 * nPkts * sizeof(mscclpp::LLPacket); size_t srcOffset = rank * nelemsPerRank * sizeof(int); uint2* src = (uint2*)((char*)buff + srcOffset); + uint2* dst = (uint2*)((char*)resultBuff + srcOffset); // step 1: write to scratch buffer smChan.putPackets(scratchOffset, srcOffset, nelemsPerRank * sizeof(int), tid, blockDim.x * nBlocksPerPeer, flag); // step 2: get data from scratch buffer, reduce data and write result to remote scratch buffer for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * gridDim.x) { uint2 data = make_uint2(0, 0); - uint2 val; for (int index = 0; index < nPeers; index++) { const int remoteRank = index < rank ? index : index + 1; mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + remoteRank * nPktsPerRank; - val = dstPkt[idx].read(flag); + uint2 val = dstPkt[idx].read(flag); data.x += val.x; data.y += val.y; } data.x += src[idx].x; data.y += src[idx].y; - src[idx].x = data.x; - src[idx].y = data.y; + dst[idx].x = data.x; + dst[idx].y = data.y; for (int index = 0; index < nPeers; index++) { mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)constSmOutOfPlaceChans[index].dst_ + scratchResultOffset); dstPkt[idx + rank * nPktsPerRank].write(data.x, data.y, flag); @@ -872,7 +873,7 @@ __global__ void allreduce6(int* buff, int* scratch, int rank, int nRanksPerNode, const int remoteRank = peerIdx < rank ? peerIdx : peerIdx + 1; mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)((char*)scratch + scratchResultOffset); const int dstOffset = remoteRank * nPktsPerRank; - uint2* result = (uint2*)((char*)buff + remoteRank * nelemsPerRank * sizeof(int)); + uint2* result = (uint2*)((char*)resultBuff + remoteRank * nelemsPerRank * sizeof(int)); for (int idx = threadIdx.x + localBlockIdx * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * nBlocksPerPeer) { uint2 data = dstPkt[idx + dstOffset].read(flag); result[idx].x = data.x; @@ -923,7 +924,7 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { tmpBuff = scratchBuff; nThreadsPerBlock = 1024; } else if (kernelNum == 6) { - nBlocks = 28; + nBlocks = 21; tmpBuff = scratchPacketBuff; nThreadsPerBlock = 512; } else { @@ -950,8 +951,8 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { allreduce5<<>>((int*)inputBuff, (int*)tmpBuff, resultBuff, rank, args.nRanksPerNode, worldSize, paramCount_); else if (kernelNum == 6) { - allreduce6<<>>((int*)inputBuff, (int*)tmpBuff, rank, args.nRanksPerNode, - worldSize, paramCount_); + allreduce6<<>>((int*)inputBuff, (int*)tmpBuff, resultBuff, rank, + args.nRanksPerNode, worldSize, paramCount_); } } @@ -1047,7 +1048,7 @@ AllReduceTestEngine::AllReduceTestEngine(const TestArgs& args) : BaseTestEngine( bool AllReduceTestEngine::isUsePacket() const { return (args_.kernelNum == 2 || args_.kernelNum == 6); } -bool AllReduceTestEngine::isInPlace() const { return (args_.kernelNum != 2); } +bool AllReduceTestEngine::isInPlace() const { return (args_.kernelNum != 2 && args_.kernelNum != 6); } void AllReduceTestEngine::allocateBuffer() { inputBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index 318590099..e80531048 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -351,9 +351,6 @@ size_t BaseTestEngine::checkData() { CUDATHROW(cudaMemcpy(recvData.data(), recvBuff, recvBytes, cudaMemcpyDeviceToHost)); for (size_t i = 0; i < recvData.size(); i++) { if (recvData[i] != ((int*)expectedBuff)[i]) { - if (this->args_.rank == 1) - std::cout << "ERROR: recvData[" << i << "]=" << recvData[i] << " != expectedBuff[" << i - << "]=" << ((int*)expectedBuff)[i] << std::endl; nErrors++; } } From dc968bad834e7e820e1dd8f1a5583c559fb0783d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 31 Aug 2023 11:27:32 +0000 Subject: [PATCH 8/9] update --- .azure-pipelines/integration-test.yml | 2 + include/mscclpp/semaphore_device.hpp | 3 +- test/deploy/perf_ndmv4.jsonl | 4 ++ test/mscclpp-test/allreduce_test.cu | 53 ++++++++++++++++++++++++++- 4 files changed, 58 insertions(+), 4 deletions(-) diff --git a/.azure-pipelines/integration-test.yml b/.azure-pipelines/integration-test.yml index c7491569e..4d96581ad 100644 --- a/.azure-pipelines/integration-test.yml +++ b/.azure-pipelines/integration-test.yml @@ -87,6 +87,8 @@ jobs: mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 2 -o output.jsonl mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 3 -o output.jsonl mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 1K -e 1G -f 2 -k 4 -o output.jsonl + mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 12M -e 48M -i 3145728 2 -k 5 -o output.jsonl + mpirun -np 8 --bind-to numa -x MSCCLPP_DEBUG=WARN ./build/test/mscclpp-test/allreduce_test_perf -b 24K -e 768K -i 24576 -k 6 -w 100 -n 100 -o output.jsonl workingDirectory: '$(System.DefaultWorkingDirectory)' - task: Bash@3 diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index 2f948baed..292a8b495 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -39,8 +39,7 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { __forceinline__ __device__ void signal() { // This fence ensures that preceding writes are visible on the peer GPU before the incremented // `outboundSemaphoreId` is visible. - asm volatile("fence.acq_rel.sys;" ::: "memory"); - // __threadfence_system(); + __threadfence_system(); semaphoreIncrement(); *remoteInboundSemaphoreId = semaphoreGetLocal(); } diff --git a/test/deploy/perf_ndmv4.jsonl b/test/deploy/perf_ndmv4.jsonl index bec80a949..8d76e9059 100644 --- a/test/deploy/perf_ndmv4.jsonl +++ b/test/deploy/perf_ndmv4.jsonl @@ -7,6 +7,10 @@ {"name":"allreduce", "kernel":3, "ranks":8, "ranksPerNode":8, "algBw":139.08, "busBw":243.40, "size":1073741824, "time":7719.85, "target":"throughput"} {"name":"allreduce", "kernel":4, "ranks":8, "ranksPerNode":8, "algBw":106.98, "busBw":187.22, "size":16777216, "time":156.81, "target":"throughput"} {"name":"allreduce", "kernel":4, "ranks":8, "ranksPerNode":8, "algBw":116.24, "busBw":203.42, "size":33554432, "time":288.65, "target":"throughput"} +{"name":"allreduce", "kernel":5, "ranks":8, "ranksPerNode":8, "algBw":126.52,"busBw":221.418,"size":50331648, "time":397.79, "target":"throughput"} +{"name":"allreduce", "kernel":6, "ranks":8, "ranksPerNode":8, "algBw":3.3919,"busBw":5.9359, "size":24576, "time":7.24, "target":"latency"} +{"name":"allreduce", "kernel":6, "ranks":8, "ranksPerNode":8, "algBw":6.21, "busBw":10.87, "size":49152, "time":7.91, "target":"latency"} +{"name":"allreduce", "kernel":6, "ranks":8, "ranksPerNode":8, "algBw":8.90, "busBw":15.57, "size":73728, "time":8.28, "target":"latency"} {"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":84.55, "busBw":158.53, "size":25165824, "time":297.64, "target":"throughput"} {"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":99.43, "busBw":186.44, "size":50331648, "time":506.16, "target":"throughput"} {"name":"allreduce", "kernel":4, "ranks":16,"ranksPerNode":8, "algBw":124.60, "busBw":233.64, "size":3221225472, "time":25850.67,"target":"throughput"} diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 09bc9457f..a1e9b9d8f 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -291,6 +291,55 @@ __device__ void localReduceScatterSm(int* buff, int* scratch, int rank, int nRan int4* buff4 = (int4*)buff; + for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { + smChans[peerIdx].signal(); + } + for (int peerIdx = threadIdx.x + blockIdx.x * blockDim.x; peerIdx < nPeer; peerIdx += blockDim.x * nBlocks) { + smChans[peerIdx].wait(); + } + reduceScatterDeviceSyncer.sync(nBlocks); + + const size_t nInt4 = nelems / 4; + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nInt4; idx += blockDim.x * nBlocks) { + int4 sum = make_int4(0, 0, 0, 0); + + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + int4 val = smChans[peerIdx].read(indexOffset4 + idx); + sum.w += val.w; + sum.x += val.x; + sum.y += val.y; + sum.z += val.z; + } + buff4[indexOffset4 + idx].w += sum.w; + buff4[indexOffset4 + idx].x += sum.x; + buff4[indexOffset4 + idx].y += sum.y; + buff4[indexOffset4 + idx].z += sum.z; + } + + const size_t nLastInts = nelems % 4; + for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nLastInts; idx += blockDim.x * nBlocks) { + int sum = 0; + for (int peerIdx = 0; peerIdx < nPeer; peerIdx++) { + int val = smChans[peerIdx].read(indexOffset + nInt4 * 4 + idx); + sum += val; + } + buff[indexOffset + nInt4 * 4 + idx] += sum; + } +} + +__device__ void localReduceScatterSm2(int* buff, int* scratch, int rank, int nRanksPerNode, size_t chunkSize, + size_t nelems, int nBlocks) { + if (nRanksPerNode == 1) return; + if (blockIdx.x >= nBlocks) return; + const int nPeer = nRanksPerNode - 1; + DeviceHandle* smChans = constSmOutOfPlaceGetChans; + + const size_t localRankIndexInNode = rank % nRanksPerNode; + const size_t indexOffset = localRankIndexInNode * chunkSize; + const size_t indexOffset4 = indexOffset / 4; + + int4* buff4 = (int4*)buff; + const int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < nPeer) { smChans[tid].signal(); @@ -817,7 +866,7 @@ __global__ void allreduce4(int* buff, int* scratch, void* result, int rank, int __global__ void allreduce5(int* buff, int* scratch, void* result, int rank, int nRanksPerNode, int worldSize, size_t nelems) { - localReduceScatterSm(buff, scratch, rank, nRanksPerNode, 0, 0, nelems / worldSize, nelems / worldSize, gridDim.x); + localReduceScatterSm2(buff, scratch, rank, nRanksPerNode, nelems / worldSize, nelems / worldSize, gridDim.x); deviceSyncer.sync(gridDim.x); localRingAllGatherSm(rank, nRanksPerNode, nelems / worldSize * sizeof(int), gridDim.x); } @@ -916,7 +965,7 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { tmpBuff = scratchBuff; nThreadsPerBlock = 1024; } else if (kernelNum == 4) { - nBlocks = 49; + nBlocks = 45; tmpBuff = scratchBuff; nThreadsPerBlock = 512; } else if (kernelNum == 5) { From 11d5fe19b2e874e0e54ce024bcd2549e38bcf201 Mon Sep 17 00:00:00 2001 From: Changho Hwang Date: Wed, 13 Sep 2023 00:18:15 +0000 Subject: [PATCH 9/9] Erase code commented out --- test/mscclpp-test/allreduce_test.cu | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index a1e9b9d8f..0d8292d86 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -1015,9 +1015,6 @@ void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBu for (size_t i = 0; i < recvCount_; i++) { dataHost[i] = worldSize * (worldSize - 1) / 2; } - // if (args.rank == 1) { - // printf("data is %d\n", dataHost[0]); - // } std::memcpy(expectedBuff, dataHost.data(), recvCount_ * typeSize_); }