diff --git a/.azure-pipelines/integration-test-rocm.yml b/.azure-pipelines/integration-test-rocm.yml index c098ab085..3315dfa9a 100644 --- a/.azure-pipelines/integration-test-rocm.yml +++ b/.azure-pipelines/integration-test-rocm.yml @@ -62,9 +62,8 @@ jobs: targetType: 'inline' script: | set -e - git clone https://$(GIT_USER):$(GIT_PAT)@msazure.visualstudio.com/DefaultCollection/One/_git/azure-mscclpp - cd azure-mscclpp - git checkout binyli/ci + git clone https://$(GIT_USER):$(GIT_PAT)@msazure.visualstudio.com/DefaultCollection/One/_git/msccl-users + cd msccl-users mkdir execution-files python3 algos/allreduce_mi300_packet.py 8 8 > execution-files/allreduce_mi300_packet.json python3 algos/allreduce_mi300_sm_mscclpp.py 8 8 > execution-files/allreduce_mi300_sm_mscclpp.json @@ -90,8 +89,8 @@ jobs: set -e export PATH=/usr/local/mpi/bin:$PATH sudo /usr/local/mpi/bin/mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=$(pwd)/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN \ - -x ALLREDUCEPKT_IP_JSON_FILE=./azure-mscclpp/execution-files/allreduce_mi300_packet.json \ - -x ALLREDUCE_IP_JSON_FILE=./azure-mscclpp/execution-files/allreduce_mi300_sm_mscclpp.json \ + -x ALLREDUCEPKT_IP_JSON_FILE=./msccl-users/execution-files/allreduce_mi300_packet.json \ + -x ALLREDUCE_IP_JSON_FILE=./msccl-users/execution-files/allreduce_mi300_sm_mscclpp.json \ -x ALLREDUCE_SMALL_MSG_BOUNDARY=32K -x ALLREDUCE_LARGE_MSG_BOUNDARY=1M ./rccl-tests/build/all_reduce_perf \ -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 100 workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 7295171e9..73496445d 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -9,11 +9,11 @@ on: - cron: "30 1 * * 1" jobs: - analyze: - name: Analyze + analyze-cuda: + name: Analyze (CUDA) runs-on: 'ubuntu-latest' container: - image: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-${{ matrix.cuda-version }} + image: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-${{ matrix.version }} permissions: actions: read @@ -24,7 +24,7 @@ jobs: fail-fast: false matrix: language: [ 'cpp', 'python' ] - cuda-version: [ 'cuda11.8', 'cuda12.2' ] + version: [ 'cuda11.8', 'cuda12.2' ] steps: - name: Checkout repository @@ -45,10 +45,56 @@ jobs: - name: Build run: | - cmake -DBYPASS_GPU_CHECK=ON -DUSE_CUDA=ON . + rm -rf build && mkdir build && cd build + cmake -DBYPASS_GPU_CHECK=ON -DUSE_CUDA=ON .. make -j - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v2 with: - category: "/language:${{matrix.language}}/cuda-version:${{matrix.cuda-version}}" + category: "/language:${{matrix.language}}/version:${{matrix.version}}" + + analyze-rocm: + name: Analyze (ROCm) + runs-on: 'ubuntu-latest' + container: + image: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-${{ matrix.version }} + + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'cpp', 'python' ] + version: [ 'rocm6.2' ] + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Check disk space + run: | + df -h + + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + + - name: Dubious ownership exception + run: | + git config --global --add safe.directory /__w/mscclpp/mscclpp + + - name: Build + run: | + rm -rf build && mkdir build && cd build + CXX=/opt/rocm/bin/hipcc cmake -DBYPASS_GPU_CHECK=ON -DUSE_ROCM=ON .. + make -j + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 + with: + category: "/language:${{matrix.language}}/version:${{matrix.version}}" diff --git a/.github/workflows/doc-build.yaml b/.github/workflows/doc-build.yaml new file mode 100644 index 000000000..78af009e8 --- /dev/null +++ b/.github/workflows/doc-build.yaml @@ -0,0 +1,34 @@ +name: Docs Build + +on: + pull_request: + branches: + - '**' + +permissions: + contents: read + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y doxygen graphviz + pip install -r docs/requirements.txt + + - name: Build docs + run: | + cd docs + doxygen + make html + touch _build/html/.nojekyll diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml new file mode 100644 index 000000000..1c2645edf --- /dev/null +++ b/.github/workflows/gh-pages.yml @@ -0,0 +1,58 @@ +name: GitHub Pages + +on: + push: + branches: + - main + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages +permissions: + contents: read + pages: write + id-token: write + +# Allow only one concurrent deployment, skipping runs queued between the run in-progress and latest queued. +# However, do NOT cancel in-progress runs as we want to allow these production deployments to complete. +concurrency: + group: "pages" + cancel-in-progress: false + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y doxygen graphviz + pip install -r docs/requirements.txt + - name: Build docs + run: | + cd docs + doxygen + make html + touch _build/html/.nojekyll + - name: Upload artifacts + uses: actions/upload-pages-artifact@v3 + with: + path: docs/_build/html + + deploy: + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + runs-on: ubuntu-latest + needs: build + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 000000000..11da73bb2 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,36 @@ +# Read the Docs configuration file for Sphinx projects +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the OS, Python version and other tools you might need +build: + os: ubuntu-22.04 + apt_packages: + - doxygen + tools: + python: "3.12" + jobs: + pre_build: + - cd docs && doxygen + +# Build documentation in the "docs/" directory with Sphinx +sphinx: + configuration: docs/conf.py + # You can configure Sphinx to use a different builder, for instance use the dirhtml builder for simpler URLs + # builder: "dirhtml" + # Fail on all warnings to avoid broken references + # fail_on_warning: true + +# Optionally build your docs in additional formats such as PDF and ePub +# formats: +# - pdf +# - epub + +# Optional but recommended, declare the Python requirements required +# to build your documentation +# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html +python: + install: + - requirements: docs/requirements.txt diff --git a/README.md b/README.md index 9796179d3..cd8e80790 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,7 @@ |--------------------------|-------------------| | Unit Tests (CUDA) | [![Build Status](https://dev.azure.com/binyli/HPC/_apis/build/status%2Fmscclpp-ut?branchName=main)](https://dev.azure.com/binyli/HPC/_build/latest?definitionId=4&branchName=main) | | Integration Tests (CUDA) | [![Build Status](https://dev.azure.com/binyli/HPC/_apis/build/status%2Fmscclpp-test?branchName=main)](https://dev.azure.com/binyli/HPC/_build/latest?definitionId=3&branchName=main) | - -*NOTE (Nov 2023): Azure pipelines for ROCm will be added soon.* +| Integration Tests (ROCm) | [![Build Status](https://dev.azure.com/binyli/HPC/_apis/build/status%2Fmscclpp-test-rocm?branchName=main)](https://dev.azure.com/binyli/HPC/_build/latest?definitionId=7&branchName=main) | A GPU-driven communication stack for scalable AI applications. diff --git a/apps/nccl/src/allgather.hpp b/apps/nccl/src/allgather.hpp index 4e7441249..4c417c312 100644 --- a/apps/nccl/src/allgather.hpp +++ b/apps/nccl/src/allgather.hpp @@ -103,8 +103,6 @@ __global__ void __launch_bounds__(1024, 1) } } - //deviceSyncer.sync(gridDim.x); - if (threadIdx.x < nPeer) { smChans[threadIdx.x].relaxedSignal(); smChans[threadIdx.x].wait(); @@ -116,21 +114,16 @@ template cudaError_t allgather(T* buff, [[maybe_unused]] T* scratch, [[maybe_unused]] T* resultBuff, mscclpp::DeviceHandle* smChannels, size_t channelOutOffset, int rank, int nRanksPerNode, int worldSize, size_t nelems, cudaStream_t stream) { - int nBlocks = 28; - if (nelems <= 4096) { - nBlocks = 7; - } - else if (nelems <= 32768) { - nBlocks = 14; + nBlocks = 7; + } else if (nelems <= 32768) { + nBlocks = 14; } else if (nelems >= 2097152) { - nBlocks = 35; + nBlocks = 35; } - - allgather6<<>>((void*)buff, smChannels, channelOutOffset, rank, worldSize, - nRanksPerNode, nelems * sizeof(T) / sizeof(int)); + nRanksPerNode, nelems * sizeof(T) / sizeof(int)); return cudaGetLastError(); } diff --git a/apps/nccl/src/allreduce.hpp b/apps/nccl/src/allreduce.hpp index 2ff23fb72..c4c1b1a5e 100644 --- a/apps/nccl/src/allreduce.hpp +++ b/apps/nccl/src/allreduce.hpp @@ -14,7 +14,6 @@ #include "common.hpp" #include "gpu_data_types.hpp" - template __forceinline__ __device__ To bit_cast(const From& src) { static_assert(sizeof(To) == sizeof(From), "Size mismatch for bit_cast"); @@ -44,7 +43,6 @@ template <> __forceinline__ __device__ __half2 clip(__half2 val) { val.x = __hmax(val.x, bit_cast<__half, unsigned short>(0xfbff)); val.x = __hmin(val.x, bit_cast<__half, unsigned short>(0x7bff)); - val.y = __hmax(val.y, bit_cast<__half, unsigned short>(0xfbff)); val.y = __hmin(val.y, bit_cast<__half, unsigned short>(0x7bff)); return val; @@ -192,17 +190,6 @@ __forceinline__ __device__ void vectorSum(T* dst, T* src, size_t nElem) { vectorSum(dst, src, nElem, blockIdx.x, gridDim.x); } -// template -// __global__ void __launch_bounds__(32, 1) -// test(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle* smChannels, -// size_t channelDataOffset, size_t channelScratchOffset, int rank, int nRanksPerNode, int worldSize, -// size_t nelems, uint32_t flag) { -// // add 0.1f to all elements -// for (size_t i = threadIdx.x + blockIdx.x * gridDim.x; i < nelems; i += blockDim.x * gridDim.x) { -// buff[i] = add_elements(buff[i], T(0.1f)); -// } -// } - template __global__ void __launch_bounds__(32, 1) allreduceAllToAll(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle* smChannels, @@ -254,19 +241,19 @@ __global__ void __launch_bounds__(1024, 1) size_t nelems, uint32_t flag) { // This version of allreduce only works for single nodes if (worldSize != nRanksPerNode) return; - + if (sizeof(T) == 2) - nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int); + nelems = (nelems * sizeof(T) + sizeof(T)) / sizeof(int); else - nelems = nelems / (sizeof(int) / sizeof(T)); - + nelems = nelems / (sizeof(int) / sizeof(T)); + const int nPeers = nRanksPerNode - 1; - const size_t nPkts = nelems/2; - + const size_t nPkts = nelems / 2; + int nelemsPerRank = nelems / worldSize; if ((nelemsPerRank % 2)) nelemsPerRank = (nelemsPerRank * sizeof(T) + sizeof(T)) / sizeof(T); - const int nPktsPerRank = nelemsPerRank/2; + const int nPktsPerRank = nelemsPerRank / 2; // thread block & channel info const int nBlocksPerPeer = gridDim.x / nPeers; const int localBlockIdx = blockIdx.x % nBlocksPerPeer; @@ -291,29 +278,22 @@ __global__ void __launch_bounds__(1024, 1) // step 1: write to scratch buffer channels[peerIdx].putPackets(scratchOffset, srcOffset, nelemsPerRank * sizeof(int), tid, - blockDim.x * nBlocksPerPeer, flag); + blockDim.x * nBlocksPerPeer, flag); // step 2: get data from scratch buffer, reduce data and write result to remote scratch buffer for (int idx = threadIdx.x + blockIdx.x * blockDim.x; idx < nPktsPerRank; idx += blockDim.x * gridDim.x) { - //uint32_t data = 0; - uint2 data = make_uint2(0, 0); + uint2 data = src[idx]; for (int index = 0; index < NPEERS; index++) { const int remoteRank = index < rank ? index : index + 1; mscclpp::LLPacket* dstPkt = (mscclpp::LLPacket*)scratchBuff + remoteRank * nPktsPerRank; - //uint32_t val = dstPkt[idx].read(flag, -1); uint2 val = dstPkt[idx].read(flag); - //data = add_vectors(val, data); data.x = add_vectors(val.x, data.x); - data.y = add_vectors(val.y, data.y); + data.y = add_vectors(val.y, data.y); } - data.x = add_vectors(data.x, src[idx].x); - data.y = add_vectors(data.y, src[idx].y); dst[idx].x = data.x; dst[idx].y = data.y; mscclpp::LLPacket packet; - /*packet.data = data; - packet.flag = flag;*/ packet.data1 = data.x; packet.flag1 = flag; packet.data2 = data.y; @@ -467,10 +447,6 @@ cudaError_t allreduce(T* buff, T* scratch, T* resultBuff, mscclpp::DeviceHandle< size_t nelems, cudaStream_t stream) { static uint32_t flag = 1; - // test<<<7, 32, 0, stream>>>(buff, scratch, resultBuff, smChannels, channelInOffset, - // channelScratchOffset, rank, nRanksPerNode, worldSize, - // nelems, flag++); - if (sizeof(T) * nelems < worldSize * sizeof(int)) { int nBlocks = 7; int nThreadsPerBlock = 32; diff --git a/apps/nccl/src/common.hpp b/apps/nccl/src/common.hpp index af8ef1785..015e0a2f6 100644 --- a/apps/nccl/src/common.hpp +++ b/apps/nccl/src/common.hpp @@ -4,6 +4,8 @@ #ifndef NCCL_COMMON_HPP_ #define NCCL_COMMON_HPP_ +#include + #if defined(__HIP_PLATFORM_AMD__) #define WARP_SIZE 64 #define __syncwarp() __builtin_amdgcn_wave_barrier() @@ -16,8 +18,6 @@ constexpr int NPEERS = 7; constexpr int SCRATCH_SIZE = 2 * 1024 * 1024 * 70; // double buffer * 35 thread-blocks * 8 ranks * 256KB = 70MB -#include - __device__ mscclpp::DeviceSyncer deviceSyncer; #endif // NCCL_COMMON_HPP_ diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index cb0e7d56e..a414ffe85 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -293,9 +293,10 @@ NCCL_API ncclResult_t ncclGetUniqueId(ncclUniqueId* uniqueId) { return ncclSuccess; } -NCCL_API ncclResult_t ncclCommInitRankConfig(ncclComm_t*, int, ncclUniqueId, int, ncclConfig_t*) { - // TODO: implement this function - return ncclInternalError; +NCCL_API ncclResult_t ncclCommInitRankConfig(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank, + ncclConfig_t*) { + // TODO: implement config + return ncclCommInitRank(comm, nranks, commId, rank); } NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank) { @@ -419,7 +420,7 @@ NCCL_API const char* ncclGetErrorString(ncclResult_t result) { NCCL_API const char* ncclGetLastError(ncclComm_t) { // TODO: implement this function - return nullptr; + return ""; } NCCL_API ncclResult_t ncclCommGetAsyncError(ncclComm_t, ncclResult_t* asyncError) { @@ -488,29 +489,30 @@ NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t std::shared_ptr plan; if (bytes <= comm->largeMessageSizeBoundary) plan = (sendbuff == recvbuff) ? comm->allReducePacketIPPlan : comm->allReducePacketOPPlan; - else + else { plan = (sendbuff == recvbuff) ? comm->allReduceIPPlan : comm->allReduceOPPlan; + } if (plan == nullptr) return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); switch (datatype) { case ncclFloat16: - comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, 1024, - *plan, stream, mscclpp::PacketType::LL8); + comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, *plan, + stream, mscclpp::PacketType::LL8); break; case ncclFloat32: comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, - 1024, *plan, stream, mscclpp::PacketType::LL8); + *plan, stream, mscclpp::PacketType::LL8); break; case ncclBfloat16: comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, - mscclpp::DataType::BFLOAT16, 1024, *plan, stream, mscclpp::PacketType::LL8); + mscclpp::DataType::BFLOAT16, *plan, stream, mscclpp::PacketType::LL8); break; case ncclInt32: case ncclUint32: - comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, 1024, - *plan, stream, mscclpp::PacketType::LL8); + comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, *plan, + stream, mscclpp::PacketType::LL8); break; default: return ncclInvalidArgument; diff --git a/docker/base-dev-x.dockerfile b/docker/base-dev-x.dockerfile index d6236cd02..5aeaa4142 100644 --- a/docker/base-dev-x.dockerfile +++ b/docker/base-dev-x.dockerfile @@ -2,7 +2,7 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} LABEL maintainer="MSCCL++" -LABEL org.opencontainers.image.source https://github.com/microsoft/mscclpp +LABEL org.opencontainers.image.source=https://github.com/microsoft/mscclpp RUN apt-get update && \ apt-get install -y --no-install-recommends \ diff --git a/docker/base-x-rocm.dockerfile b/docker/base-x-rocm.dockerfile index 5865cc397..cffa68cc5 100644 --- a/docker/base-x-rocm.dockerfile +++ b/docker/base-x-rocm.dockerfile @@ -2,7 +2,7 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} LABEL maintainer="MSCCL++" -LABEL org.opencontainers.image.source https://github.com/microsoft/mscclpp +LABEL org.opencontainers.image.source=https://github.com/microsoft/mscclpp ENV DEBIAN_FRONTEND=noninteractive diff --git a/docker/base-x.dockerfile b/docker/base-x.dockerfile index a1ba20693..45f39c709 100644 --- a/docker/base-x.dockerfile +++ b/docker/base-x.dockerfile @@ -2,7 +2,7 @@ ARG BASE_IMAGE FROM ${BASE_IMAGE} LABEL maintainer="MSCCL++" -LABEL org.opencontainers.image.source https://github.com/microsoft/mscclpp +LABEL org.opencontainers.image.source=https://github.com/microsoft/mscclpp ENV DEBIAN_FRONTEND=noninteractive USER root @@ -17,6 +17,7 @@ RUN apt-get update && \ git \ libcap2 \ libnuma-dev \ + lsb-release \ openssh-client \ openssh-server \ python3-dev \ @@ -31,11 +32,13 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* /tmp/* # Install OFED -ENV OFED_VERSION=5.2-2.2.3.0 +ARG OFED_VERSION=5.2-2.2.3.0 RUN cd /tmp && \ - wget -q https://content.mellanox.com/ofed/MLNX_OFED-${OFED_VERSION}/MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64.tgz && \ - tar xzf MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64.tgz && \ - MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu20.04-x86_64/mlnxofedinstall --user-space-only --without-fw-update --force --all && \ + OS_VERSION=$(lsb_release -rs) && \ + OS_VERSION=ubuntu${OS_VERSION} && \ + wget -q https://content.mellanox.com/ofed/MLNX_OFED-${OFED_VERSION}/MLNX_OFED_LINUX-${OFED_VERSION}-${OS_VERSION}-x86_64.tgz && \ + tar xzf MLNX_OFED_LINUX-${OFED_VERSION}-${OS_VERSION}-x86_64.tgz && \ + MLNX_OFED_LINUX-${OFED_VERSION}-${OS_VERSION}-x86_64/mlnxofedinstall --user-space-only --without-fw-update --without-ucx-cuda --force --all && \ rm -rf /tmp/MLNX_OFED_LINUX-${OFED_VERSION}* # Install OpenMPI diff --git a/docker/build.sh b/docker/build.sh index d8af5f8fd..3e2169f68 100755 --- a/docker/build.sh +++ b/docker/build.sh @@ -8,6 +8,7 @@ baseImageTable=( ["cuda12.1"]="nvidia/cuda:12.1.1-devel-ubuntu20.04" ["cuda12.2"]="nvidia/cuda:12.2.2-devel-ubuntu20.04" ["cuda12.3"]="nvidia/cuda:12.3.2-devel-ubuntu20.04" + ["cuda12.4"]="nvidia/cuda:12.4.1-devel-ubuntu22.04" ["rocm6.2"]="rocm/rocm-terminal:6.2" ) @@ -20,11 +21,16 @@ extraLdPathTable=( ["rocm6.2"]="/opt/rocm/lib" ) +declare -A ofedVersionTable +ofedVersionTable=( + ["cuda12.4"]="23.07-0.5.1.2" +) + GHCR="ghcr.io/microsoft/mscclpp/mscclpp" TARGET=${1} print_usage() { - echo "Usage: $0 [cuda11.8|cuda12.1|cuda12.2|cuda12.3|rocm6.2]" + echo "Usage: $0 [cuda11.8|cuda12.1|cuda12.2|cuda12.3|cuda12.4|rocm6.2]" } if [[ ! -v "baseImageTable[${TARGET}]" ]]; then @@ -38,11 +44,18 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" cd ${SCRIPT_DIR}/.. +DEFAULT_OFED_VERSION="5.2-2.2.3.0" +OFED_VERSION=${ofedVersionTable[${TARGET}]} +if [[ -z ${OFED_VERSION} ]]; then + OFED_VERSION=${DEFAULT_OFED_VERSION} +fi + docker build -t ${GHCR}-common:base-${TARGET} \ -f docker/base-x.dockerfile \ --build-arg BASE_IMAGE=${baseImageTable[${TARGET}]} \ --build-arg EXTRA_LD_PATH=${extraLdPathTable[${TARGET}]} \ - --build-arg TARGET=${TARGET} . + --build-arg TARGET=${TARGET} \ + --build-arg OFED_VERSION=${OFED_VERSION} . if [[ ${TARGET} == rocm* ]]; then echo "Building ROCm base image..." diff --git a/docs/.gitignore b/docs/.gitignore index 00d9344fb..a69fac7ab 100644 --- a/docs/.gitignore +++ b/docs/.gitignore @@ -1,3 +1,2 @@ doxygen/ _build/ -sphinx/ diff --git a/docs/README.md b/docs/README.md index 2bb9c1efb..80d80b16c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -3,13 +3,13 @@ 1. Install `doxygen`. ```bash - $ sudo apt-get install doxygen + $ sudo apt-get install doxygen graphviz ``` 2. Install Python packages below. If you install them on the user's local, you need to include `~/.local/bin` to `$PATH` (to use `sphinx-build`). ```bash - $ sudo python3 -m pip install sphinx sphinx_rtd_theme breathe + $ sudo python3 -m pip install -r ./requirements.txt ``` 3. Create Doxygen documents. @@ -21,7 +21,7 @@ 4. Create Sphinx documents. ```bash - $ sphinx-build -b html -Dbreathe_projects.mscclpp=$PWD/doxygen/xml $PWD $PWD/sphinx + $ make html ``` -5. Done. The HTML files will be on `sphinx/` directory. +5. Done. The HTML files will be on `_build/` directory. diff --git a/docs/api/index.rst b/docs/api/index.rst new file mode 100644 index 000000000..461a9fbdb --- /dev/null +++ b/docs/api/index.rst @@ -0,0 +1,5 @@ +API Reference +============= + +.. doxygennamespace:: mscclpp + :members: diff --git a/docs/conf.py b/docs/conf.py index 4a94b3aa5..4d3a91022 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -14,12 +14,13 @@ # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration -extensions = ["breathe"] +extensions = ["breathe", "myst_parser"] templates_path = ["_templates"] exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # Breathe configuration +breathe_projects = {"mscclpp": "./doxygen/xml"} breathe_default_project = "mscclpp" # -- Options for HTML output ------------------------------------------------- diff --git a/docs/design/design.md b/docs/design/design.md new file mode 100644 index 000000000..82b6e0965 --- /dev/null +++ b/docs/design/design.md @@ -0,0 +1,157 @@ +# MSCCL++ Design Document +## Introduction +MSCCL++ redefines inter-GPU communication interfaces, thereby delivering a highly efficient and customizable communication stack for distributed GPU applications. Its design is specifically tailored to accommodate diverse performance optimization scenarios often encountered in state-of-the-art AI applications. The figure below provides a high-level overview of MSCCL++ abstractions in CUDA, C, and Python. + + +```{figure} ../figs/abstractions.png +:name: MSCCL++ Abstractions +:alt: MSCCL++ Abstractions +:align: center + +MSCCL++ Abstractions Overview +``` + +The followings highlight the key features of MSCCL++. +* **Light-weight and multi-layer abstractions.** MSCCL++ provides communication abstractions at lowest level close to hardware and at the highest level close to application API. The lowest level of abstraction is ultra light weight which enables a user to implement logics of data movement for a collective operation such as AllReduce inside a GPU kernel extremely efficiently without worrying about memory ordering of different ops. The modularity of MSCCL++ enables a user to construct the building blocks of MSCCL++ in a high level abstraction in Python and feed them to a CUDA kernel in order to facilitate the user's productivity. + +* **1-sided 0-copy synchronous and asynchronous abstracts.** MSCCL++ provides fine-grained synchronous and asynchronous 0-copy 1-sided abstracts for communication primitives such as `put()`, `get()`, `signal()`, `flush()`, and `wait()`. The 1-sided abstractions allows a user to asynchronously `put()` their data on the remote GPU as soon as it is ready without requiring the remote side to issue any receive instruction. This enables users to easily implement flexible communication logics, such as overlapping communication with computation, or implementing customized collective communication algorithms without worrying about potential deadlocks. Additionally, the 0-copy capability enables MSCCL++ to directly transfer data between user's buffers without using intermediate internal buffers which saves GPU bandwidth and memory capacity. + +* **Unified abstractions for different interconnection hardware.** MSCCL++ provides consistent abstractions regardless of the location of the remote GPU (either on the local node or on a remote node) or the underlying link (either NVLink/xGMI or InfiniBand). This simplifies the code for inter-GPU communication, which is often complex due to memory ordering of GPU/CPU read/writes and therefore, is error-prone. + +## Concepts + +To implement the list of features above, some concepts are introduced. +### Channel +MSCCL++ provides peer-to-peer communication methods between GPUs. A peer-to-peer connection between two GPUs is called a *Channel*. Channels are constructed by MSCCL++ host-side interfaces and copied to GPUs during initialization. Channels provide *GPU-side interfaces*, which means that all communication methods are defined as a device function to be called from a GPU kernel code. Following code shows the basic usage for channel, the `put()` method in the following code copies 1KB data from the local GPU to a remote GPU. +```cpp +__global__ void gpuKernel() { + ... + // Only one thread is needed for this method. + channel.put(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024); + ... +} +``` +MSCCL++ also provides efficient synchronization methods, `signal()`, `flush()`, and `wait()`. We will discuss these methods in the following sections. + +#### SmChannel & ProxyChannel +MSCCL++ delivers two types of channels, **ProxyChannel** and **SmChannel**. `ProxyChannel` provides (R)DMA-based data copy and synchronization methods. When called, these methods send/receive a signal to/from a host-side proxy (hence the name `ProxyChannel`), which will trigger (R)DMA (such as `cudaMemcpy*` or `ibv_post_send`) or issue synchronization methods (such as `cudaStreamSynchronize` or `ibv_poll_cq`). Since the key functionalities are run by the proxy, ProxyChannel requires only a single GPU thread to call its methods. See all `ProxyChannel` methods from [here](https://github.com/microsoft/mscclpp/blob/main/include/mscclpp/proxy_channel_device.hpp). + +On the other hand, `SmChannel` provides memory-mapping-based copy and synchronization methods. When called, these methods will directly use GPU threads to read/write from/to the remote GPU's memory space. Comparing against ProxyChannel, SmChannel is especially performant for low-latency scenarios, while it may need many GPU threads to call copying methods at the same time to achieve high copying bandwidth. See all SmChannel methods from [here](https://github.com/microsoft/mscclpp/blob/main/include/mscclpp/sm_channel_device.hpp). + +### Fifo & Trigger +One of the key features of MSCCL++ is to offload the communication logic from the GPU to the CPU. +To offload the communication logic from the GPU to the CPU, MSCCL++ introduces the concept of `Fifo` and `Trigger`. A Fifo is a circular buffer that shared between the GPU and the CPU. It is used to store `Trigger`. A `Trigger` is a signal that is sent from the GPU to the CPU to notify the CPU that there are commands in the Fifo that need to be processed. The CPU will then process the commands in the Fifo and send a signal back to the GPU to notify the GPU that the commands have been processed. The implementation details of Fifo and Trigger can be found in following sections. + +### ProxyService +Proxy service is a persistent service that resides in the CPU side. It functions as a polling service that receives the message `Trigger` from the GPU side and then transfers data according to the command. When we use `ProxyChannel` for communication, a `Trigger` is sent from the GPU side to the `ProxyService`. Then `ProxyService` will invoke `cudaMemcpy*` or `IB verbs` to transfer data to the targe device. + +## Implementation + +The core of MSCCL++ is implemented in C++ and CUDA. We offer both C++ and Python APIs for initializing communication channels. For interactions within the GPU kernel, we offer a collection of low-level device functions. Subsequent sections will delve into these interfaces and the methodology for transferring communication logic from the GPU to the CPU. + +### Interfaces +This section delivers a comprehensive overview of the MSCCL++ interfaces, encompassing both the setup and initialization of communication channels and the MSCCL++ kernel programming model. + +#### Communication setup and initialization APIs +MSCCL++ provides APIs in both C++ and Python for establishing communication channels, with further information available in the [Initialization](../getting-started/tutorials/initialization.md) section. Presently, it supports two types of transports: `cudaIPC` for `NVLink/xGMI`, and `IB` for `InfiniBand`. Users are empowered to select the connection type that best suits their hardware infrastructure. + +#### MSCCL++ kernel programming model +MSCCL++ offers one-sided communication methods directly callable from a GPU kernel, encompassing two primary API categories: data copy and synchronization. The data copy API features functions such as `put()`, `get()`, `read()`, and `write()`, while the synchronization API comprises `signal()`, `flush()`, and `wait()`. Demonstrated below, the basic utilization of the data copy API involves the `put()` method, which facilitates the transfer of 1KB of data from a local GPU to a remote GPU. Then send a signal to remote peer to notify the data is ready to use. To receive the data, the remote peer can call `wait()` method. +This operation is executed within a kernel launched with a single block. +```cpp +// Running on rank 0 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChannel) { + smChannel[0].put(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x); + __syncthreads(); + if (threadIdx.x == 0) { + smChannel[0].signal(); + } +} + +// Running on rank 1 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChannel) { + if (threadIdx.x == 0) { + smChannel[0].wait(); + } + __syncthreads(); + // Data is ready to use +} +``` + +Similar to the LL protocol offered by NCCL, MSCCL++ introduces a `Packet` structure designed to facilitate the transfer of both data and flags within a single instruction, proving particularly beneficial for applications where latency is a critical concern. The following code shows the basic usage of the `Packet` structure. The flag should be same for sender and receiver side. +```cpp +// Running on rank 0 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChans, int flag) { + smChans[0].putPackets(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x, + /*flag=*/ flag); +} + +// Running on rank 1 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChans, int flag) { + smChans[0].getPackets(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x, + /*flag=*/ flag); + // Data is ready to use +} +``` + +### The mechanism for offloading communication logic from the GPU to the CPU + +As mentioned in the previous section, the offloading of communication logic from the GPU to the CPU is accomplished through the `Fifo` and `Trigger` mechanism. + +The accompanying figure details the structure of `Tigger`, employing three bits to denote the operation type: `data transfer`, `signal`, and `flush`. The remaining fields specify the precise data locations for both local and remote buffers. + +``` +|-------------------|-------------------|-------------------|-----------------|-----------------|---------|-------------------|---------------| +| 32bit size | 32bit src offset | 32bit dst offset | 9bit src mem id | 9bit dst mem id | 3bit op | 10bit channel id | 1bit reserved | +|-------------------|-------------------|-------------------|-----------------|-----------------|---------|-------------------|---------------| +``` +
The proxy trigger format
+ +Page-locked memory is utilized for the `Fifo`, guaranteeing access by both the CPU and GPU. On the CPU side, a polling thread periodically checks the Fifo for new commands. Upon processing a command, it updates an incremented counter to signal to the GPU that the command has been executed. Users wishing to ensure a command has been processed can invoke `flush()`, which waits for the device-side counter to reflect this update. + +## Use Cases + +In this section, we will discuss several use cases that demonstrate the capabilities of MSCCL++. + +### Overlapping communication with computation + +MSCCL++ enables the offloading of communication logic from the GPU to the CPU, facilitating the overlapping of communication and computation processes. The code snippet provided illustrates this overlapping technique. In the depicted scenario, the GPU emits a signal to the CPU indicating readiness for data transfer. Subsequently, while the GPU continues to execute computation tasks, the CPU initiates the data transfer to the designated target device. +```cpp +__device__ void gpuKernel(mscclpp::SimpleProxyChannelDeviceHandle* proxyChannel) { + int tid = threadIdx.x + blockIdx.x * blockDim.x; + // Send a trigger to the CPU + if (tid == 0) { + proxyChannel[0].putWithSignal(/*dstOffset*/ 0, /*srcOffset*/ 0, /*size*/ 1024); + } + // Continue computation + matrixMul() + // ... +} +``` + +### Fusion of communication and computation + +Traditional communication libraries enforce a separation between communication and computation, creating a bottleneck where communication must await the completion of computation, especially when data dependencies exist. In contrast, MSCCL++ leverages its low-level premitives to facilitate the seamless integration of communication with computation. By segmenting the computation into tiles, MSCCL++ enables the simultaneous pipelining of computation and communication tasks. This approach not only mitigates the communication delay by overlapping processes but also significantly improves throughput by leveraging the low-level API for fine-grained control over the hardware, ensuring optimal efficiency. + +### Implementing customized collective communication algorithms + +MCSCL++ offers a low-level communication API, allowing users to design customized collective communication algorithms. The following code demonstrates how to implement a customized All2All algorithm using MSCCL++. +```cpp +using DeviceHandle = mscclpp::DeviceHandle; +__device__ void localAlltoall(DeviceHandle* proxyChans, int rank, + int nRanksPerNode, size_t nElements) { + int remoteRank = ((int)blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; + for (int i = 1; i < nRanksPerNode; i++) { + DeviceHandle proxyChan = proxyChans[blockIdx.x]; + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) { + proxyChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), + nElements * sizeof(int)); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank - i + nRanksPerNode) % nRanksPerNode) { + proxyChan.wait(); + } + deviceSyncer.sync(nRanksPerNode - 1); + } +} +``` diff --git a/apps/nccl/README.md b/docs/design/nccl-over-mscclpp.md similarity index 55% rename from apps/nccl/README.md rename to docs/design/nccl-over-mscclpp.md index 364feeed2..ca362e9b5 100644 --- a/apps/nccl/README.md +++ b/docs/design/nccl-over-mscclpp.md @@ -1,6 +1,7 @@ -## NCCL Over MSCCL++ +# NCCL Over MSCCL++ -### Limitations +(limitations)= +## Limitations Current NCCL over MSCCL++ has a few limitations. @@ -8,7 +9,8 @@ Current NCCL over MSCCL++ has a few limitations. * Multi-node communication is not supported yet. * Currently, collective communication functions may not work correctly if the buffer address is differed from that of previous function calls while sharing the same base address (returned by [cuMemGetAddressRange](https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__MEM.html#group__CUDA__MEM_1g64fee5711274a2a0573a789c94d8299b)) with the previous address. This is because the current implementation performs zero-copy communication over user buffers, and it is difficult to efficiently inform all ranks if the buffer address dynamically changes. -### API Support Table +(api-support-table)= +## API Support Table The table below lists all NCCL APIs (v2.21). We may cover more APIs in the future. @@ -44,3 +46,26 @@ The table below lists all NCCL APIs (v2.21). We may cover more APIs in the futur | ncclRecv | X | | ncclRedOpCreatePreMulSum | X | | ncclRedOpDestroy | X | + +## Executor Support + +The executor is a versatile tool designed to specify how mscclpp executes algorithms. Currently, only the allReduce operation allows for algorithm customization. The following environment variables can be managed: + +- ALLREDUCEPKT_IP_JSON_FILE: Specifies the path to the JSON file that defines the algorithm for small-sized, in-place operations. +- ALLREDUCEPKT_OP_JSON_FILE: Specifies the path to the JSON file that defines the algorithm for small-sized, out-of-place operations. +- ALLREDUCE_IP_JSON_FILE: Specifies the path to the JSON file that defines the algorithm for larger-sized, in-place operations. +- ALLREDUCE_OP_JSON_FILE: Specifies the path to the JSON file that defines the algorithm for larger-sized, out-of-place operations. +- ALLREDUCE_SMALL_MSG_BOUNDARY: Defines the size threshold at which the algorithm will switch between fallback code and the customized algorithm for small messages. +- ALLREDUCE_LARGE_MSG_BOUNDARY: Defines the size threshold at which the algorithm will switch between the customized algorithm for small messages and that for larger messages. + +```{figure} ../figs/size_boundary_diagram.png +:name: MMSCCL++ Abstractions +:alt: MSCCL++ Abstractions +:align: center + +Decision Flowchart for Message Size-Based Algorithm Execution +``` + +This is an example of executing the interface with the executor: +``` bash +mpirun -np 8 -x ALLREDUCEPKT_IP_JSON_FILE=/root/azure-mscclpp/nccl/test/execution-files/allreducepacket.json -x ALLREDUCE_IP_JSON_FILE=/root/azure-mscclpp/nccl/test/execution-files/allreducesm.json -x ALLREDUCE_SMALL_MSG_BOUNDARY=16K -x ALLREDUCE_LARGE_MSG_BOUNDARY=1M ./apps/nccl/test/nccl_api_test diff --git a/docs/figs/size_boundary_diagram.png b/docs/figs/size_boundary_diagram.png new file mode 100644 index 000000000..41e3a38cb Binary files /dev/null and b/docs/figs/size_boundary_diagram.png differ diff --git a/docs/quickstart.md b/docs/getting-started/quickstart.md similarity index 90% rename from docs/quickstart.md rename to docs/getting-started/quickstart.md index d09ce0baf..8c0982e3e 100644 --- a/docs/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -20,6 +20,17 @@ lsmod | grep nvidia_peermem ``` +## Build with Docker Images + +We provide docker images which package all prerequisites for MSCCL++. You can setup your dev environment with the following command. + +```bash +$ docker run -it --privileged --net=host --ipc=host --gpus all ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2 mscclpp-dev bash +``` + +See all available images [here](https://github.com/microsoft/mscclpp/pkgs/container/mscclpp%2Fmscclpp). + +(build-from-source)= ## Build from Source CMake 3.25 or later is required. @@ -54,6 +65,7 @@ $ make -j mscclpp mscclpp_static $ sudo make install/fast ``` +(install-from-source-python-module)= ## Install from Source (Python Module) Python 3.8 or later is required. @@ -101,7 +113,7 @@ $ mpirun -np 16 -npernode 8 -hostfile hostfile ./test/mp_unit_tests -ip_port 10. ### Python Benchmark -[Install the MSCCL++ Python package](https://github.com/microsoft/mscclpp/blob/chhwang/docs/docs/quickstart.md#install-from-source-python-module) and run our Python AllReduce benchmark as follows. It requires MPI on the system. +[Install the MSCCL++ Python package](#install-from-source-python-module) and run our Python AllReduce benchmark as follows. It requires MPI on the system. ```bash # Choose `requirements_*.txt` according to your CUDA/ROCm version. @@ -163,4 +175,4 @@ mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=$MSCCLPP_BUILD/app If MSCCL++ is built on AMD platforms, `libmscclpp_nccl.so` would replace the [RCCL](https://github.com/ROCm/rccl) library (i.e., `librccl.so`). -See limitations of the current NCCL over MSCCL++ from [here](../apps/nccl/README.md#limitations). +See limitations of the current NCCL over MSCCL++ from [here](../design/nccl-over-mscclpp.md#limitations). diff --git a/docs/getting-started/tutorials/customized-proxy-service.md b/docs/getting-started/tutorials/customized-proxy-service.md new file mode 100644 index 000000000..232f81066 --- /dev/null +++ b/docs/getting-started/tutorials/customized-proxy-service.md @@ -0,0 +1 @@ +# Customize the Proxy Service diff --git a/docs/getting-started/tutorials/index.rst b/docs/getting-started/tutorials/index.rst new file mode 100644 index 000000000..7ee91b194 --- /dev/null +++ b/docs/getting-started/tutorials/index.rst @@ -0,0 +1,16 @@ +Tutorials +---------- + +This tutorial section provides a step-by-step guide to help you get started with the C++/Python API. + +.. toctree:: + :maxdepth: 1 + :caption: Tutorials + :hidden: + + initialization + proxy-channel + sm-channel + packet-api + customized-proxy-service + python-api diff --git a/docs/getting-started/tutorials/initialization.md b/docs/getting-started/tutorials/initialization.md new file mode 100644 index 000000000..0bdd8ad45 --- /dev/null +++ b/docs/getting-started/tutorials/initialization.md @@ -0,0 +1,71 @@ +# Commnunication initialize with mscclpp API + +In this tutorial, you will write a simple program to initialize communication between eight GPUs using MSCCL++ C++ API. You will also learn how to use the Python API to initialize communication. + +## Prerequisites +A system with eight GPUs is required to run this tutorial. + +Also make sure that you have installed MSCCL++ on your system. If not, please follow the [quick start](../quickstart.md). + +## Initialize Communication with C++ API +We will setup a mesh topology with eight GPUs. Each GPU will be connected to its neighbors. The following code shows how to initialize communication with MSCCL++ C++ API. + +```cpp +#include +#include +#include + +#include +#include +#include + +template +using DeviceHandle = mscclpp::DeviceHandle; +__constant__ DeviceHandle constProxyChans[8]; + +void setupMeshTopology(int rank, int worldsize, void* data, size_t dataSize) { + std::string ip_port = "10.0.0.4:50000"; + auto bootstrap = std::make_shared(rank, worldsize); + bootstrap->initialize(ip_port); + mscclpp::Communicator comm(bootstrap); + mscclpp::ProxyService proxyService; + + std::vector semaphoreIds; + std::vector localMemories; + std::vector>> connections(world_size); + std::vector> remoteMemories; + + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + mscclpp::Transport transport = mscclpp::Transport::CudaIpc; + // Connect with all other ranks + connections[r] = comm.connectOnSetup(r, 0, transport); + auto memory = comm.registerMemory(data, dataSize, mscclpp::Transport::CudaIpc | ibTransport); + localMemories.push_back(memory); + comm.sendMemoryOnSetup(memory, r, 0); + remoteMemories.push_back(comm.recvMemoryOnSetup(r, 0)); + } + + comm.setup(); + + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + semaphoreIds.push_back(proxyService.buildAndAddSemaphore(comm, connections[r].get())); + } + + comm.setup(); + + std::vector> proxyChannels; + for (size_t i = 0; i < semaphoreIds.size(); ++i) { + proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( + proxyService.proxyChannel(semaphoreIds[i]), proxyService.addMemory(remoteMemories[i].get()), + proxyService.addMemory(localMemories[i])))); + } + + if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + std::runtime_error("unexpected error"); + } + CUDACHECK(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), + sizeof(DeviceHandle) * proxyChannels.size())); +} +``` diff --git a/docs/getting-started/tutorials/packet-api.md b/docs/getting-started/tutorials/packet-api.md new file mode 100644 index 000000000..8f4ea7074 --- /dev/null +++ b/docs/getting-started/tutorials/packet-api.md @@ -0,0 +1 @@ +# Packet API for latency sensitive applications diff --git a/docs/getting-started/tutorials/proxy-channel.md b/docs/getting-started/tutorials/proxy-channel.md new file mode 100644 index 000000000..fec5c4cc0 --- /dev/null +++ b/docs/getting-started/tutorials/proxy-channel.md @@ -0,0 +1,3 @@ +# Offload commnunication to CPU with ProxyChannel + +TBU diff --git a/docs/getting-started/tutorials/python-api.md b/docs/getting-started/tutorials/python-api.md new file mode 100644 index 000000000..9e6c5627b --- /dev/null +++ b/docs/getting-started/tutorials/python-api.md @@ -0,0 +1,92 @@ +# Working with Python API + +We provide Python API which help to initialze and setup the channel easily. +In this tutorial, you will write a simple program to initialize communication between eight GPUs using MSCCL++ Python API. + +## Setup Channel with Python API + +We will setup a mesh topology with eight GPUs. Each GPU will be connected to its neighbors. The following code shows how to initialize communication with MSCCL++ Python API. +```python +from mpi4py import MPI +import cupy as cp + +from mscclpp import ( + ProxyService, + Transport, +) +import mscclpp.comm as mscclpp_comm + +def create_connection(group: mscclpp_comm.CommGroup, transport: str): + remote_nghrs = list(range(group.nranks)) + remote_nghrs.remove(group.my_rank) + if transport == "NVLink": + tran = Transport.CudaIpc + elif transport == "IB": + tran = group.my_ib_device(group.my_rank % 8) + else: + assert False + connections = group.make_connection(remote_nghrs, tran) + return connections + +if __name__ == "__main__": + mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD) + connections = create_connection(mscclpp_group, "NVLink") + nelems = 1024 + memory = cp.zeros(nelem, dtype=cp.int32) + proxy_service = ProxyService() + simple_channels = group.make_proxy_channels(proxy_service, memory, connections) + proxy_service.start_proxy() + mscclpp_group.barrier() + launch_kernel(mscclpp_group.my_rank, mscclpp_group.nranks, simple_channels, memory) + cp.cuda.runtime.deviceSynchronize() + mscclpp_group.barrier() +``` + +### Launch Kernel with Python API +We provide some Python utils to help you launch kernel via python. Here is a exampl. +```python +from mscclpp.utils import KernelBuilder, pack + +def launch_kernel(my_rank: int, nranks: int, simple_channels: List[SimpleProxyChannel], memory: cp.ndarray): + file_dir = os.path.dirname(os.path.abspath(__file__)) + kernel = KernelBuilder(file="test.cu", kernel_name="test", file_dir=file_dir).get_compiled_kernel() + params = b"" + first_arg = next(iter(simple_channels.values())) + size_of_channels = len(first_arg.device_handle().raw) + device_handles = [] + for rank in range(nranks): + if rank == my_rank: + device_handles.append( + bytes(size_of_channels) + ) # just zeros for semaphores that do not exist + else: + device_handles.append(simple_channels[rank].device_handle().raw) + # keep a reference to the device handles so that they don't get garbage collected + d_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8) + params = pack(d_channels, my_rank, nranks, memory.size) + + nblocks = 1 + nthreads = 512 + kernel.launch_kernel(params, nblocks, nthreads, 0, None) +``` + +The test kernel is defined in `test.cu` as follows: +```cuda +#include +#include + +// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing +extern "C" __global__ void __launch_bounds__(1024, 1) + simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks, + int num_elements) { + int tid = threadIdx.x; + int nthreads = blockDim.x; + uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks; + uint64_t my_offset = size_per_rank * my_rank; + __syncthreads(); + if (tid < nranks && tid != my_rank) { + channels[tid].putWithSignalAndFlush(my_offset, my_offset, size_per_rank); + channels[tid].wait(); + } +} +``` diff --git a/docs/getting-started/tutorials/sm-channel.md b/docs/getting-started/tutorials/sm-channel.md new file mode 100644 index 000000000..191e47b36 --- /dev/null +++ b/docs/getting-started/tutorials/sm-channel.md @@ -0,0 +1,3 @@ +# Using SmChannel for Intra-Node Communication + +TBU diff --git a/docs/index.rst b/docs/index.rst index ba060047c..dc5604364 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,11 +6,56 @@ Welcome to MSCCL++'s documentation! =================================== +MSCCL++ is a GPU-driven communication stack for scalable AI applications. It is designed to provide a high-performance, scalable, and customizable communication stack for distributed GPU applications. + +Getting Started +--------------- +- Follow the :doc:`quick start ` for your platform of choice. +- Take a look at the :doc:`tutorials ` to learn how to write your first mscclpp program. + +.. toctree:: + :maxdepth: 1 + :caption: Getting Started + :hidden: + + getting-started/quickstart + getting-started/tutorials/index + +Design +------- +- :doc:`Design ` doc for those who want to understand the internals of MSCCL++. +- :doc:`NCCL over MSCCL++ ` doc for those who want to understand how to use NCCL over MSCCL++. + +.. toctree:: + :maxdepth: 1 + :caption: Design + :hidden: + + design/design + design/nccl-over-mscclpp + +Performance +--------------- +- We evaluate the performance of MSCCL++ in A100 and H100. Here are some :doc:`performance results ` for all-reduce operations. + .. toctree:: - :maxdepth: 2 - :caption: Contents: + :maxdepth: 1 + :caption: Performance + :hidden: + + performance/performance-ndmv4 + +C++ API +--------------- +- :doc:`mscclpp ` +.. toctree:: + :maxdepth: 1 + :caption: C++ API + :hidden: + + api/index Indices and tables ================== @@ -18,9 +63,3 @@ Indices and tables * :ref:`genindex` * :ref:`modindex` * :ref:`search` - -Docs -==== - -.. doxygennamespace:: mscclpp - :members: diff --git a/docs/performance-ndmv4.md b/docs/performance/performance-ndmv4.md similarity index 100% rename from docs/performance-ndmv4.md rename to docs/performance/performance-ndmv4.md diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 000000000..82bb70d03 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,3 @@ +breathe +sphinx_rtd_theme +myst_parser diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index 52c3c6da9..e994548e4 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -43,7 +43,7 @@ class Executor { ~Executor(); void execute(int rank, void* sendbuff, void* recvBuff, size_t sendBuffSize, size_t recvBuffSize, DataType dataType, - int nthreads, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType = PacketType::LL16); + const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType = PacketType::LL16); private: struct Impl; diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp index da0206c0f..1a24b241f 100644 --- a/include/mscclpp/npkit/npkit_event.hpp +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -13,6 +13,6 @@ #define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 #define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x15 +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x17 #endif diff --git a/include/mscclpp/packet_device.hpp b/include/mscclpp/packet_device.hpp index 532676d43..d7f2ee8a0 100644 --- a/include/mscclpp/packet_device.hpp +++ b/include/mscclpp/packet_device.hpp @@ -49,8 +49,6 @@ union alignas(16) LL16Packet { #else // !defined(MSCCLPP_DEVICE_CUDA) uint4 reg = make_uint4(val1, flag, val2, flag); ulonglong2* p = reinterpret_cast(®); - /*atomicStore(&(raw_.x), p->x, memoryOrderRelaxed); - atomicStore(&(raw_.y), p->y, memoryOrderRelaxed);*/ __builtin_nontemporal_store(p->x, &(raw_.x)); __builtin_nontemporal_store(p->y, &(raw_.y)); #endif diff --git a/python/mscclpp/executor_py.cpp b/python/mscclpp/executor_py.cpp index dadbf40f6..c550ecb00 100644 --- a/python/mscclpp/executor_py.cpp +++ b/python/mscclpp/executor_py.cpp @@ -29,11 +29,10 @@ void register_executor(nb::module_& m) { .def( "execute", [](Executor* self, int rank, uintptr_t sendbuff, uintptr_t recvBuff, size_t sendBuffSize, size_t recvBuffSize, - DataType dataType, int nthreads, const ExecutionPlan& plan, uintptr_t stream, PacketType packetType) { + DataType dataType, const ExecutionPlan& plan, uintptr_t stream, PacketType packetType) { self->execute(rank, reinterpret_cast(sendbuff), reinterpret_cast(recvBuff), sendBuffSize, - recvBuffSize, dataType, nthreads, plan, (cudaStream_t)stream, packetType); + recvBuffSize, dataType, plan, (cudaStream_t)stream, packetType); }, nb::arg("rank"), nb::arg("sendbuff"), nb::arg("recvBuff"), nb::arg("sendBuffSize"), nb::arg("recvBuffSize"), - nb::arg("dataType"), nb::arg("nthreads"), nb::arg("plan"), nb::arg("stream"), - nb::arg("packetType") = PacketType::LL16); + nb::arg("dataType"), nb::arg("plan"), nb::arg("stream"), nb::arg("packetType") = PacketType::LL16); } diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 23c3ff483..5fd59f2bb 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -77,11 +77,20 @@ def dtype_to_mscclpp_dtype(dtype): raise ValueError(f"Unknown data type: {dtype}") +def determine_result_buf(sendbuf, recvbuf, in_place, execution_plan_name): + if "allgather" in execution_plan_name: + return recvbuf + elif in_place: + return sendbuf + else: + return recvbuf + + def main( - execution_paln_name: str, + execution_plan_name: str, execution_plan_path: str, size: int, - nthreads_per_block: int, + in_place: bool = True, dtype: cp.dtype = cp.float16, packet_type: PacketType = PacketType.LL16, seed: int = 42, @@ -92,26 +101,36 @@ def main( npkit_dump_dir = os.getenv("NPKIT_DUMP_DIR") if npkit_dump_dir is not None: npkit.init(mscclpp_group.my_rank) - execution_plan = ExecutionPlan(execution_paln_name, execution_plan_path) + execution_plan = ExecutionPlan(execution_plan_name, execution_plan_path) cp.random.seed(seed) nelems = size // cp.dtype(dtype).itemsize - buffer = cp.random.random(nelems * mscclpp_group.nranks).astype(dtype) + buffer = cp.random.random(nelems * mscclpp_group.nranks, dtype=cp.float32).astype(dtype) sub_arrays = cp.split(buffer, MPI.COMM_WORLD.size) - sendbuf = sub_arrays[MPI.COMM_WORLD.rank] - expected = cp.zeros_like(sendbuf) - for i in range(mscclpp_group.nranks): - expected += sub_arrays[i] + sendbuf = cp.zeros(nelems, dtype=dtype) + for i in range(nelems): + sendbuf[i] = sub_arrays[MPI.COMM_WORLD.rank][i] + + if "allgather" in execution_plan_name: + recvbuf = cp.zeros(nelems * mscclpp_group.nranks, dtype=dtype) + if in_place: + for i in range(nelems): + recvbuf[mscclpp_group.my_rank * nelems + i] = sendbuf[i] + expected = buffer + else: + recvbuf = cp.zeros(nelems, dtype=dtype) + expected = cp.zeros_like(sendbuf, dtype=dtype) + for i in range(mscclpp_group.nranks): + expected += sub_arrays[i] mscclpp_group.barrier() executor_func = lambda stream: executor.execute( MPI.COMM_WORLD.rank, sendbuf.data.ptr, - sendbuf.data.ptr, - sendbuf.nbytes, + determine_result_buf(sendbuf, recvbuf, in_place, execution_plan_name).data.ptr, sendbuf.nbytes, + determine_result_buf(sendbuf, recvbuf, in_place, execution_plan_name).nbytes, dtype_to_mscclpp_dtype(dtype), - nthreads_per_block, execution_plan, stream.ptr, packet_type, @@ -120,17 +139,22 @@ def main( stream = cp.cuda.Stream(non_blocking=True) executor_func(stream) stream.synchronize() - assert cp.allclose(sendbuf, expected, atol=1e-2 * mscclpp_group.nranks) + + assert cp.allclose( + determine_result_buf(sendbuf, recvbuf, in_place, execution_plan_name), + expected, + atol=1e-2 * mscclpp_group.nranks, + ) mscclpp_group.barrier() - execution_time = bench_time(100, 10, executor_func) + execution_time = bench_time(10, 10, executor_func) if npkit_dump_dir is not None: npkit.dump(npkit_dump_dir) npkit.shutdown() print( f"Rank: {MPI.COMM_WORLD.rank} Execution time: {execution_time} us, " f"data size: {sendbuf.nbytes} bytes data type: {dtype().dtype.name} " - f"packet type: {packet_type} nthreads_per_block: {nthreads_per_block}" + f"packet type: {packet_type}" ) executor = None mscclpp_group = None @@ -141,7 +165,7 @@ def main( parser.add_argument("-n", "--execution_plan_name", type=str, required=True) parser.add_argument("-path", "--execution_plan_path", type=str, required=True) parser.add_argument("--size", type=str, required=True) - parser.add_argument("--nthreads_per_block", type=int, required=True) + parser.add_argument("--in_place", action="store_true", help="flag to define an in-place operation") parser.add_argument("--dtype", type=str, default="float16", help="Choose from float16, float32, int32") parser.add_argument("--packet_type", type=str, default="LL16", help="Choose from LL8, LL16") parser.add_argument("--seed", type=int, default=42) @@ -157,7 +181,7 @@ def main( args.execution_plan_name, args.execution_plan_path, buffer_size, - args.nthreads_per_block, + args.in_place, dtype, packet_type, args.seed, diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 9535c869f..1be0b1821 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -630,7 +630,6 @@ def test_executor(mpi_group: MpiGroup, filename: str): sendbuf.nbytes, sendbuf.nbytes, DataType.float16, - 512, execution_plan, stream.ptr, ) diff --git a/src/bootstrap/socket.cc b/src/bootstrap/socket.cc index 9e5913403..6377bc6de 100644 --- a/src/bootstrap/socket.cc +++ b/src/bootstrap/socket.cc @@ -1,8 +1,6 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ +// Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. +// Modifications Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. #include "socket.h" diff --git a/src/connection.cc b/src/connection.cc index 57e77b40b..79c4c9630 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -16,10 +16,13 @@ namespace mscclpp { -void validateTransport(RegisteredMemory mem, Transport transport) { +void validateTransport(RegisteredMemory mem, Transport transport, uint64_t offset = 0, uint64_t size = 0) { if (!mem.transports().has(transport)) { throw Error("RegisteredMemory does not support this transport", ErrorCode::InvalidUsage); } + if (offset + size > mem.size()) { + throw Error("RegisteredMemory out of bounds", ErrorCode::InvalidUsage); + } } // Connection @@ -59,8 +62,8 @@ Transport CudaIpcConnection::remoteTransport() { return Transport::CudaIpc; } void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { - validateTransport(dst, remoteTransport()); - validateTransport(src, transport()); + validateTransport(dst, remoteTransport(), dstOffset, size); + validateTransport(src, transport(), srcOffset, size); char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); @@ -115,8 +118,8 @@ Transport IBConnection::remoteTransport() { return remoteTransport_; } void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { - validateTransport(dst, remoteTransport()); - validateTransport(src, transport()); + validateTransport(dst, remoteTransport(), dstOffset, size); + validateTransport(src, transport(), srcOffset, size); auto dstTransportInfo = getImpl(dst)->getTransportInfo(remoteTransport()); if (dstTransportInfo.ibLocal) { @@ -231,8 +234,8 @@ Transport EthernetConnection::remoteTransport() { return Transport::Ethernet; } void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { // Validating Transport Protocol - validateTransport(dst, remoteTransport()); - validateTransport(src, transport()); + validateTransport(dst, remoteTransport(), dstOffset, size); + validateTransport(src, transport(), srcOffset, size); // Initializing Variables char* srcPtr = reinterpret_cast(src.data()) + srcOffset / sizeof(char); diff --git a/src/debug.cc b/src/debug.cc index aa97b09db..dea9ee713 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -1,8 +1,6 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ +// Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. +// Modifications Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. #include "debug.h" diff --git a/src/endpoint.cc b/src/endpoint.cc index 35817793f..015d51a60 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + #include "endpoint.hpp" #include diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 8ebf510e2..09ebc6d8f 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -148,12 +148,19 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c } return std::vector(bufferTypes.begin(), bufferTypes.end()); } -size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize) const { +size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { + size_t sizePerRank; + if (this->inputChunks.at(rank) != 0) + sizePerRank = inputSize / this->inputChunks.at(rank); + else if (this->outputChunks.at(rank) != 0) + sizePerRank = outputSize / this->outputChunks.at(rank); + else + throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + if (this->isUsingPacket) { - return inputSize / this->inputChunks.at(rank) * this->scratchChunks.at(rank) * 2 /* data + flag*/ * - 2 /*double buffer*/; + return sizePerRank * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; } - return inputSize / this->inputChunks.at(rank) * this->scratchChunks.at(rank); + return sizePerRank * this->scratchChunks.at(rank); } std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; @@ -161,7 +168,10 @@ std::vector ExecutionPlan::Impl::getOperations(int rank, int threadbl int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->operations.at(rank).size(); } -void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset) { +int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; } + +void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, + size_t constDstOffset) { std::ifstream file(this->planPath); json obj = json::parse(file); if (this->name != obj["name"]) { @@ -171,6 +181,7 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t contsSrcOff if (protocol == "LL") { this->isUsingPacket = true; } + this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024); const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { @@ -183,10 +194,12 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t contsSrcOff this->setupChannels(gpus); this->inputSize = inputSize; + this->outputSize = outputSize; this->setupOperations(gpus, contsSrcOffset, constDstOffset); } -void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset) { +void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, + size_t constDstOffset) { std::ifstream file(this->planPath); json obj = json::parse(file); if (this->name != obj["name"]) { @@ -207,6 +220,7 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t contsS } this->inputSize = inputSize; + this->outputSize = outputSize; this->setupOperations(gpus, contsSrcOffset, constDstOffset); } @@ -310,8 +324,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse // Get the relevant channel index in rank channelInfos operation.inputChannelIndexes[i] = channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]]; - operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["i_cids"][i]["off"]) + - (srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); + operation.inputOffsets[i] = + this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["i_cids"][i]["off"]) + + (srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); chunkIndexes.push_back((uint32_t)op["i_cids"][i]["off"]); } } @@ -320,8 +335,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse operation.nInputs = op["srcs"].size(); operation.inputBufferType = convertToBufferType(op["srcs"][0]["buff"]); for (int i = 0; i < operation.nInputs; i++) { - operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["srcs"][i]["off"]) + - (operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); + operation.inputOffsets[i] = + this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcs"][i]["off"]) + + (operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); chunkIndexes.push_back((uint32_t)op["srcs"][i]["off"]); } } @@ -332,8 +348,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse BufferType dstBufferType = convertToBufferType(op["o_buff"]["dst"]); operation.outputChannelIndexes[i] = channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]]; - operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["o_cids"][i]["off"]) + - (dstBufferType != BufferType::SCRATCH ? constDstOffset : 0); + operation.outputOffsets[i] = + this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["o_cids"][i]["off"]) + + (dstBufferType != BufferType::SCRATCH ? constDstOffset : 0); chunkIndexes.push_back((uint32_t)op["o_cids"][i]["off"]); } } @@ -342,8 +359,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse operation.nOutputs = op["dsts"].size(); operation.outputBufferType = convertToBufferType(op["dsts"][0]["buff"]); for (int i = 0; i < operation.nOutputs; i++) { - operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["dsts"][i]["off"]) + - (operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0); + operation.outputOffsets[i] = + this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dsts"][i]["off"]) + + (operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0); chunkIndexes.push_back((uint32_t)op["dsts"][i]["off"]); } } @@ -351,18 +369,19 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse operation.srcBufferType = convertToBufferType(op["srcbuff"]); } if (op.contains("srcoff")) { - operation.srcOffset = this->getOffset(rank, this->inputSize, (uint32_t)op["srcoff"]); + operation.srcOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcoff"]); chunkIndexes.push_back((uint32_t)op["srcoff"]); } if (op.contains("dstbuff")) { operation.dstBufferType = convertToBufferType(op["dstbuff"]); } if (op.contains("dstoff")) { - operation.dstOffset = this->getOffset(rank, this->inputSize, (uint32_t)op["dstoff"]); + operation.dstOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dstoff"]); chunkIndexes.push_back((uint32_t)op["dstoff"]); } if (op.contains("cnt")) { - operation.size = this->getNChunkSize(rank, this->inputSize, (uint32_t)op["cnt"], chunkIndexes); + operation.size = + this->getNChunkSize(rank, this->inputSize, this->outputSize, (uint32_t)op["cnt"], chunkIndexes); } ops.push_back(operation); } @@ -371,14 +390,33 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse } } -size_t ExecutionPlan::Impl::getOffset(int rank, size_t inputSize, uint32_t chunkIndex, uint32_t alignment) const { +std::pair ExecutionPlan::Impl::calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const { + std::pair sizePerRank; + if (this->inputChunks.at(rank) == 0 && this->outputChunks.at(rank) == 0) { + throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + } else if (this->inputChunks.at(rank) != 0 && this->outputChunks.at(rank) != 0) { + if (inputSize / this->inputChunks.at(rank) != outputSize / this->outputChunks.at(rank)) + throw mscclpp::Error("Size per chunks inconsistent", mscclpp::ErrorCode::ExecutorError); + else + sizePerRank = std::make_pair(inputSize, this->inputChunks.at(rank)); + } else if (this->inputChunks.at(rank) != 0) { + sizePerRank = std::make_pair(inputSize, this->inputChunks.at(rank)); + } else if (this->outputChunks.at(rank) != 0) { + sizePerRank = std::make_pair(outputSize, this->outputChunks.at(rank)); + } + return sizePerRank; +} + +size_t ExecutionPlan::Impl::getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, + uint32_t alignment) const { if (inputSize % alignment != 0) { throw Error("inputSize must be a multiple of alignment", ErrorCode::ExecutorError); } const int nGroups = this->chunkGroups.at(rank); - uint32_t nInputChunks = this->inputChunks.at(rank); - uint32_t nelems = inputSize / (alignment * sizeof(uint8_t)); + auto sizePerRank = calcSizePerRank(rank, inputSize, outputSize); + uint32_t nInputChunks = sizePerRank.second; + uint32_t nelems = sizePerRank.first / (alignment * sizeof(uint8_t)); if (nelems % nGroups != 0) { throw Error("Input size must be a multiple of nGroups", ErrorCode::ExecutorError); } @@ -394,12 +432,12 @@ size_t ExecutionPlan::Impl::getOffset(int rank, size_t inputSize, uint32_t chunk return static_cast(offset) * alignment; } -size_t ExecutionPlan::Impl::getNChunkSize(int rank, size_t inputSize, uint32_t nChunks, +size_t ExecutionPlan::Impl::getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks, const std::vector chunkIndexes) const { size_t nChunkSize = 0; for (uint32_t index : chunkIndexes) { - uint32_t beginOff = getOffset(rank, inputSize, index); - uint32_t endOff = getOffset(rank, inputSize, index + nChunks); + uint32_t beginOff = getOffset(rank, inputSize, outputSize, index); + uint32_t endOff = getOffset(rank, inputSize, outputSize, index + nChunks); if (nChunkSize == 0) { nChunkSize = endOff - beginOff; } else if (nChunkSize != endOff - beginOff) { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index f932316a2..9de8a58be 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -65,6 +65,7 @@ struct ExecutionContext { std::shared_ptr scratchBuffer; size_t scratchBufferSize; std::shared_ptr deviceExecutionPlansBuffer; + int nthreadsPerBlock; }; struct Executor::Impl { @@ -79,13 +80,13 @@ struct Executor::Impl { } ~Impl() = default; - ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t messageSize, - size_t contsSrcOffset, size_t constDstOffset, size_t sendBufferSize, - size_t recvBufferSize, const ExecutionPlan& plan) { + ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize, + size_t outputMessageSize, size_t contsSrcOffset, size_t constDstOffset, + size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) { ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name}; if (this->contexts.find(key) != this->contexts.end()) { plan.impl_->operationsReset(); - plan.impl_->lightLoadExecutionPlan(messageSize, contsSrcOffset, constDstOffset); + plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset); this->setupDeviceExecutionPlan(this->contexts[key], rank, plan); this->contexts[key].deviceExecutionPlansBuffer = allocExtSharedCuda(this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan)); @@ -96,17 +97,18 @@ struct Executor::Impl { } plan.impl_->reset(); - plan.impl_->loadExecutionPlan(messageSize, contsSrcOffset, constDstOffset); + plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset); ExecutionContext context; - size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize); + size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize); std::shared_ptr scratchBuffer = allocExtSharedCuda(scratchBufferSize); context.scratchBuffer = scratchBuffer; context.scratchBufferSize = scratchBufferSize; context.proxyService = std::make_shared(); + context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock(); this->setupConnections(context, rank, plan); this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); - this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, rank, plan); + this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); this->setupDeviceExecutionPlan(context, rank, plan); context.deviceExecutionPlansBuffer = allocExtSharedCuda(context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan)); @@ -117,6 +119,23 @@ struct Executor::Impl { return context; } + TransportFlags getTransportFlags(std::vector& infos, int rank) { + TransportFlags flags; + for (ChannelInfo& info : infos) { + if (info.channelType == ChannelType::SM) { + flags |= Transport::CudaIpc; + } else if (info.channelType == ChannelType::PROXY) { + for (int peer : info.connectedPeers) { + if (!inSameNode(rank, peer, this->nranksPerNode)) { + flags |= IBs[rank % this->nranksPerNode]; + } else + flags |= Transport::CudaIpc; + } + } + } + return flags; + }; + void setupConnections(ExecutionContext& context, int rank, const ExecutionPlan& plan) { std::vector connectedPeers = plan.impl_->getConnectedPeers(rank); std::vector>> connectionFutures; @@ -133,22 +152,6 @@ struct Executor::Impl { void setupRegisteredMemories(ExecutionContext& context, void* sendbuff, void* recvbuff, size_t sendBufferSize, size_t recvBufferSize, int rank, const ExecutionPlan& plan) { - auto getTransportFlags = [&](std::vector& infos, int rank) { - TransportFlags flags; - for (ChannelInfo& info : infos) { - if (info.channelType == ChannelType::SM) { - flags |= Transport::CudaIpc; - } else if (info.channelType == ChannelType::PROXY) { - for (int peer : info.connectedPeers) { - if (!inSameNode(rank, peer, this->nranksPerNode)) { - flags |= IBs[rank % this->nranksPerNode]; - } else - flags |= Transport::CudaIpc; - } - } - } - return flags; - }; auto getBufferInfo = [&](BufferType type) { switch (type) { case BufferType::INPUT: @@ -190,22 +193,12 @@ struct Executor::Impl { comm->setup(); for (size_t i = 0; i < remoteRegMemoryFutures.size(); i++) { context.registeredMemories[{bufferType, connectedPeers[i]}] = std::move(remoteRegMemoryFutures[i].get()); - CUdeviceptr myRegBaseAdr, peerRegBaseAdr; - size_t temp; - MSCCLPP_CUTHROW(cuMemGetAddressRange(&myRegBaseAdr, &temp, (CUdeviceptr)(char*)memory.data())); - MSCCLPP_CUTHROW(cuMemGetAddressRange( - &peerRegBaseAdr, &temp, - (CUdeviceptr)(char*)context.registeredMemories[{bufferType, connectedPeers[i]}].data())); - size_t myRegOffset = (char*)memory.data() - (char*)myRegBaseAdr; - size_t peerRegOffset = - (char*)context.registeredMemories[{bufferType, connectedPeers[i]}].data() - (char*)peerRegBaseAdr; - if (myRegOffset != peerRegOffset) throw Error("Divergent data offset between peers", ErrorCode::ExecutorError); } } } - void setupChannels(ExecutionContext& context, void* sendbuff, void* recvbuff, size_t sendBufferSize, int rank, - const ExecutionPlan& plan) { + void setupChannels(ExecutionContext& context, void* sendbuff, void* recvbuff, size_t sendBufferSize, + size_t recvBufferSize, int rank, const ExecutionPlan& plan) { const auto channelTypes = {ChannelType::SM, ChannelType::PROXY}; std::vector> smSemaphores; std::vector proxySemaphores; @@ -249,13 +242,27 @@ struct Executor::Impl { throw Error("Invalid buffer type", ErrorCode::ExecutorError); } }; + auto getBufferSize = [&](BufferType type) { + switch (type) { + case BufferType::INPUT: + return sendBufferSize; + case BufferType::OUTPUT: + return recvBufferSize; + case BufferType::SCRATCH: + return context.scratchBufferSize; + default: + throw Error("Invalid buffer type", ErrorCode::ExecutorError); + } + }; + for (ChannelType channelType : channelTypes) { std::vector channelInfos = plan.impl_->getChannelInfos(rank, channelType); int index = 0; for (ChannelInfo& info : channelInfos) { void* src = getBuffer(info.srcBufferType); - TransportFlags transport = context.registeredMemories.begin()->second.transports(); - RegisteredMemory localMemory = this->comm->registerMemory(src, sendBufferSize, transport); + size_t bufferSize = getBufferSize(info.srcBufferType); + TransportFlags transport = getTransportFlags(channelInfos, rank); + RegisteredMemory localMemory = this->comm->registerMemory(src, bufferSize, transport); for (int peer : info.connectedPeers) { if (channelType == ChannelType::SM) { context.smChannels.emplace_back(context.smSemaphores[index++], @@ -295,8 +302,8 @@ struct Executor::Impl { context.deviceExecutionPlans = std::move(deviceExecutionPlans); } - void launchKernel(ExecutionContext& context, int rank, int nthreadsPerBlock, void* sendbuff, void* recvbuff, - DataType dataType, cudaStream_t stream, PacketType packetType) { + void launchKernel(ExecutionContext& context, int rank, void* sendbuff, void* recvbuff, DataType dataType, + cudaStream_t stream, PacketType packetType) { static uint32_t flag = 0; int nthreadblocks = context.deviceExecutionPlans.size(); #if defined(ENABLE_NPKIT) @@ -315,13 +322,13 @@ struct Executor::Impl { switch (packetType) { case PacketType::LL16: ExecutionKernel::launchKernel( - rank, nthreadblocks, nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), + rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); break; case PacketType::LL8: ExecutionKernel::launchKernel( - rank, nthreadblocks, nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), + rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); break; @@ -334,7 +341,7 @@ struct Executor::Impl { Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique(comm)) {} void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, - [[maybe_unused]] size_t recvBuffSize, DataType dataType, int nthreads, const ExecutionPlan& plan, + [[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { size_t sendBytes, recvBytes; CUdeviceptr sendBasePtr, recvBasePtr; @@ -343,10 +350,10 @@ void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuff size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; - ExecutionContext context = this->impl_->setupExecutionContext( - rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, offsetIn, offsetOut, sendBytes, recvBytes, plan); - // TODO(binyli): need to flush proxy channel here - this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvbuff, dataType, stream, packetType); + ExecutionContext context = + this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize, + offsetIn, offsetOut, sendBytes, recvBytes, plan); + this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType); } Executor::~Executor() = default; diff --git a/src/include/debug.h b/src/include/debug.h index c3cc9f36d..1abbad340 100644 --- a/src/include/debug.h +++ b/src/include/debug.h @@ -1,8 +1,6 @@ -/************************************************************************* - * Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ +// Copyright (c) 2015-2022, NVIDIA CORPORATION. All rights reserved. +// Modifications Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. #ifndef MSCCLPP_DEBUG_H_ #define MSCCLPP_DEBUG_H_ diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index 87d07f0b4..99bf36a4f 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -31,6 +31,8 @@ enum class OperationType : uint8_t { BARRIER, PUT, PUT_PACKET, + PUT_WITH_SIGNAL, + PUT_WITH_SIGNAL_AND_FLUSH, GET, COPY, COPY_PACKET, @@ -44,8 +46,6 @@ enum class OperationType : uint8_t { REDUCE_SEND_PACKET, READ_REDUCE_COPY, READ_REDUCE_COPY_SEND, - PUT_WITH_SIGNAL, - PUT_WITH_SIGNAL_AND_FLUSH, }; struct Channels { diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 79ab4af0b..a44962782 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -65,12 +65,13 @@ struct ExecutionPlan::Impl { std::vector getUnpairedChannelInfos(int rank, int worldSize, ChannelType channelType); std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; - size_t getScratchBufferSize(int rank, size_t inputSize) const; + size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; + int getNThreadsPerBlock() const; - void loadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset); - void lightLoadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset); + void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); + void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); void setupChannels(const nlohmann::json& gpus); void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset); @@ -93,10 +94,14 @@ struct ExecutionPlan::Impl { std::unordered_map scratchChunks; std::unordered_map chunkGroups; size_t inputSize; + size_t outputSize; + int nThreadsPerBlock; private: - size_t getOffset(int rank, size_t inputSize, uint32_t chunkIndex, uint32_t alignment = 16) const; - size_t getNChunkSize(int rank, size_t inputSize, uint32_t nChunks, const std::vector offsets) const; + std::pair calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const; + size_t getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, uint32_t alignment = 16) const; + size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks, + const std::vector offsets) const; }; } // namespace mscclpp diff --git a/src/include/ibverbs_wrapper.hpp b/src/include/ibverbs_wrapper.hpp index e862cbea3..fe67268a8 100644 --- a/src/include/ibverbs_wrapper.hpp +++ b/src/include/ibverbs_wrapper.hpp @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + #ifndef MSCCLPP_IBVERBS_WRAPPER_HPP_ #define MSCCLPP_IBVERBS_WRAPPER_HPP_ diff --git a/src/include/utils_internal.hpp b/src/include/utils_internal.hpp index 52edd1889..3cd06bf52 100644 --- a/src/include/utils_internal.hpp +++ b/src/include/utils_internal.hpp @@ -1,8 +1,6 @@ -/************************************************************************* - * Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. - * - * See LICENSE.txt for license information - ************************************************************************/ +// Copyright (c) 2016-2022, NVIDIA CORPORATION. All rights reserved. +// Modifications Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. #ifndef MSCCLPP_UTILS_INTERNAL_HPP_ #define MSCCLPP_UTILS_INTERNAL_HPP_ diff --git a/test/execution-files/allreduce.json b/test/execution-files/allreduce.json index eb7e41940..afc921f4b 100644 --- a/test/execution-files/allreduce.json +++ b/test/execution-files/allreduce.json @@ -3,6 +3,7 @@ "colletive": "allreduce", "protocol": "Simple", "inplace": true, + "num_threads_per_block": 512, "gpus": [ { "id": 0, diff --git a/test/execution-files/allreduce_packet.json b/test/execution-files/allreduce_packet.json index b0df82c91..d35a4e96b 100644 --- a/test/execution-files/allreduce_packet.json +++ b/test/execution-files/allreduce_packet.json @@ -3,6 +3,7 @@ "colletive": "allreduce", "protocol": "LL", "inplace": true, + "num_threads_per_block": 768, "gpus": [ { "id": 0, diff --git a/test/executor_test.cc b/test/executor_test.cc index 33a795045..2f6d9cf5d 100644 --- a/test/executor_test.cc +++ b/test/executor_test.cc @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + #include #include @@ -56,16 +59,16 @@ mscclpp::PacketType parsePacketType(const char* value) { } double benchTime(int rank, std::shared_ptr bootstrap, std::shared_ptr executor, - const mscclpp::ExecutionPlan& plan, std::shared_ptr sendbuff, size_t bufferSize, - int nthreadsPerBlock, int niters, int ngrapthIters, mscclpp::PacketType packetType) { + const mscclpp::ExecutionPlan& plan, std::shared_ptr sendbuff, size_t bufferSize, int niters, + int ngrapthIters, mscclpp::PacketType packetType) { mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking); cudaGraph_t graph; cudaGraphExec_t graphExec; mscclpp::Timer timer; MSCCLPP_CUDATHROW(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal)); for (int i = 0; i < niters; i++) { - executor->execute(rank, sendbuff.get(), sendbuff.get(), bufferSize, bufferSize, mscclpp::DataType::FLOAT16, - nthreadsPerBlock, plan, stream, packetType); + executor->execute(rank, sendbuff.get(), sendbuff.get(), bufferSize, bufferSize, mscclpp::DataType::FLOAT16, plan, + stream, packetType); } MSCCLPP_CUDATHROW(cudaStreamEndCapture(stream, &graph)); MSCCLPP_CUDATHROW(cudaGraphInstantiate(&graphExec, graph, NULL, NULL, 0)); @@ -86,11 +89,10 @@ double benchTime(int rank, std::shared_ptr bootstrap, std::s } int main(int argc, char* argv[]) { - if (argc != 7 && argc != 8) { + if (argc != 6 && argc != 7) { std::cerr << "Usage: " << argv[0] << " " << " " << " " - << " " << " " << " " << " (optional) " << std::endl; @@ -107,13 +109,12 @@ int main(int argc, char* argv[]) { const size_t bufferSize = parseSize(argv[1]); const std::string executionPlanName = argv[2]; const std::string executionPlanPath = argv[3]; - const int nthreadsPerBlock = std::stoi(argv[4]); - const int niters = std::stoi(argv[5]); - const int ngraphIters = std::stoi(argv[6]); + const int niters = std::stoi(argv[4]); + const int ngraphIters = std::stoi(argv[5]); const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); mscclpp::PacketType packetType = mscclpp::PacketType::LL16; - if (argc == 8) { - packetType = parsePacketType(argv[7]); + if (argc == 7) { + packetType = parsePacketType(argv[6]); } std::shared_ptr bootstrap; @@ -133,8 +134,7 @@ int main(int argc, char* argv[]) { std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); std::vector dataHost(bufferSize / sizeof(int), rank); MSCCLPP_CUDATHROW(cudaMemcpy(sendbuff.get(), dataHost.data(), bufferSize, cudaMemcpyHostToDevice)); - double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, nthreadsPerBlock, niters, - ngraphIters, packetType); + double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, niters, ngraphIters, packetType); if (npkitDumpDir != nullptr) { NpKit::Dump(npkitDumpDir); diff --git a/test/mp_unit/executor_tests.cc b/test/mp_unit/executor_tests.cc index 5baa2b67a..49952b6b4 100644 --- a/test/mp_unit/executor_tests.cc +++ b/test/mp_unit/executor_tests.cc @@ -59,7 +59,7 @@ TEST_F(ExecutorTest, TwoNodesAllreduce) { const int bufferSize = 1024 * 1024; std::shared_ptr sendbuff = mscclpp::allocExtSharedCuda(bufferSize); mscclpp::CudaStreamWithFlags stream(cudaStreamNonBlocking); - executor->execute(gEnv->rank, sendbuff.get(), sendbuff.get(), bufferSize, bufferSize, mscclpp::DataType::FLOAT16, 512, + executor->execute(gEnv->rank, sendbuff.get(), sendbuff.get(), bufferSize, bufferSize, mscclpp::DataType::FLOAT16, plan, stream); MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream)); } diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index 9c52f9f4a..899823f7d 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -399,7 +400,8 @@ void BaseTestEngine::setupMeshConnectionsInternal( void BaseTestEngine::setupMeshConnections(std::vector>& proxyChannels, void* inputBuff, size_t inputBuffBytes, void* outputBuff, size_t outputBuffBytes, SetupChannelFunc setupChannel) { - const mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc | IBs[args_.gpuNum]; + mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc; + if (mscclpp::getIBDeviceCount() > 0) allTransports |= IBs[args_.gpuNum]; mscclpp::RegisteredMemory inputBufRegMem = comm_->registerMemory(inputBuff, inputBuffBytes, allTransports); mscclpp::RegisteredMemory outputBufRegMem; if (outputBuff) { @@ -429,7 +431,8 @@ void BaseTestEngine::setupMeshConnections(std::vector& smChannels, void* inputBuff, size_t inputBuffBytes, void* outputBuff, size_t outputBuffBytes, ChannelSemantic semantic, size_t nChannelPerConnection) { - const mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc | IBs[args_.gpuNum]; + mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc; + if (mscclpp::getIBDeviceCount() > 0) allTransports |= IBs[args_.gpuNum]; mscclpp::RegisteredMemory inputBufRegMem = comm_->registerMemory(inputBuff, inputBuffBytes, allTransports); mscclpp::RegisteredMemory getPacketBufRegMem; mscclpp::RegisteredMemory outputBufRegMem; @@ -469,7 +472,8 @@ void BaseTestEngine::setupMeshConnections(std::vector& smCha void* inputBuff, size_t inputBuffBytes, void* putPacketBuff, size_t putPacketBuffBytes, void* getPacketBuff, size_t getPacketBuffBytes, void* outputBuff, size_t outputBuffBytes) { - const mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc | IBs[args_.gpuNum]; + mscclpp::TransportFlags allTransports = mscclpp::Transport::CudaIpc; + if (mscclpp::getIBDeviceCount() > 0) allTransports |= IBs[args_.gpuNum]; mscclpp::RegisteredMemory inputBufRegMem = comm_->registerMemory(inputBuff, inputBuffBytes, allTransports); mscclpp::RegisteredMemory putPacketBufRegMem; mscclpp::RegisteredMemory getPacketBufRegMem; diff --git a/test/unit/utils_internal_tests.cc b/test/unit/utils_internal_tests.cc index 6ae04561b..5479a681a 100644 --- a/test/unit/utils_internal_tests.cc +++ b/test/unit/utils_internal_tests.cc @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + #include #include diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 96224b35e..9a5b88b44 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -14,6 +14,8 @@ def parse_npkit_event_header(npkit_event_header_path): "BARRIER", "PUT", "PUT_PACKET", + "PUT_WITH_SIGNAL", + "PUT_WITH_SIGNAL_AND_FLUSH", "GET", "COPY", "COPY_PACKET", @@ -27,8 +29,6 @@ def parse_npkit_event_header(npkit_event_header_path): "REDUCE_SEND_PACKET", "READ_REDUCE_COPY", "READ_REDUCE_COPY_SEND", - "PUT_WITH_SIGNAL", - "PUT_WITH_SIGNAL_AND_FLUSH", ] executor_op_to_offset = {} for executor_op in executor_ops: