diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 000000000..11da73bb2 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,36 @@ +# Read the Docs configuration file for Sphinx projects +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the OS, Python version and other tools you might need +build: + os: ubuntu-22.04 + apt_packages: + - doxygen + tools: + python: "3.12" + jobs: + pre_build: + - cd docs && doxygen + +# Build documentation in the "docs/" directory with Sphinx +sphinx: + configuration: docs/conf.py + # You can configure Sphinx to use a different builder, for instance use the dirhtml builder for simpler URLs + # builder: "dirhtml" + # Fail on all warnings to avoid broken references + # fail_on_warning: true + +# Optionally build your docs in additional formats such as PDF and ePub +# formats: +# - pdf +# - epub + +# Optional but recommended, declare the Python requirements required +# to build your documentation +# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html +python: + install: + - requirements: docs/requirements.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index eb908db87..a95a8e534 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -95,7 +95,7 @@ endif() include(${PROJECT_SOURCE_DIR}/cmake/AddFormatTargets.cmake) # Find ibverbs and libnuma -find_package(IBVerbs REQUIRED) +find_package(IBVerbs) find_package(NUMA REQUIRED) find_package(Threads REQUIRED) @@ -107,9 +107,13 @@ add_library(mscclpp_obj OBJECT) target_include_directories(mscclpp_obj SYSTEM PRIVATE ${GPU_INCLUDE_DIRS} - ${IBVERBS_INCLUDE_DIRS} ${NUMA_INCLUDE_DIRS}) -target_link_libraries(mscclpp_obj PRIVATE ${GPU_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES} nlohmann_json::nlohmann_json Threads::Threads) +target_link_libraries(mscclpp_obj PRIVATE ${GPU_LIBRARIES} ${NUMA_LIBRARIES} nlohmann_json::nlohmann_json Threads::Threads) +if(IBVERBS_FOUND) + target_include_directories(mscclpp_obj SYSTEM PRIVATE ${IBVERBS_INCLUDE_DIRS}) + target_link_libraries(mscclpp_obj PRIVATE ${IBVERBS_LIBRARIES}) + target_compile_definitions(mscclpp_obj PUBLIC USE_IBVERBS) +endif() set_target_properties(mscclpp_obj PROPERTIES LINKER_LANGUAGE CXX POSITION_INDEPENDENT_CODE 1 VERSION ${MSCCLPP_VERSION} SOVERSION ${MSCCLPP_SOVERSION}) if(USE_CUDA) target_compile_definitions(mscclpp_obj PRIVATE USE_CUDA) diff --git a/apps/nccl/CMakeLists.txt b/apps/nccl/CMakeLists.txt index 33f385da0..189a97591 100644 --- a/apps/nccl/CMakeLists.txt +++ b/apps/nccl/CMakeLists.txt @@ -11,7 +11,7 @@ endif() add_library(mscclpp_nccl_obj OBJECT) target_sources(mscclpp_nccl_obj PRIVATE ${SOURCES}) target_sources(mscclpp_nccl_obj PUBLIC FILE_SET HEADERS FILES ${HEADERS}) -target_include_directories(mscclpp_nccl_obj PRIVATE include SYSTEM PRIVATE ${GPU_INCLUDE_DIRS}) +target_include_directories(mscclpp_nccl_obj PRIVATE include ${PROJECT_SOURCE_DIR}/src/include SYSTEM PRIVATE ${GPU_INCLUDE_DIRS}) target_link_libraries(mscclpp_nccl_obj PRIVATE ${GPU_LIBRARIES} PUBLIC mscclpp_obj) set_target_properties(mscclpp_nccl_obj PROPERTIES LINKER_LANGUAGE CXX POSITION_INDEPENDENT_CODE 1 VERSION ${MSCCLPP_VERSION} SOVERSION ${MSCCLPP_SOVERSION}) if(USE_CUDA) diff --git a/apps/nccl/src/allreduce.hpp b/apps/nccl/src/allreduce.hpp index 23fd0c243..1b85136ae 100644 --- a/apps/nccl/src/allreduce.hpp +++ b/apps/nccl/src/allreduce.hpp @@ -7,12 +7,12 @@ #include #include #include -#include #include #include #include #include "common.hpp" +#include "gpu_data_types.hpp" __device__ mscclpp::DeviceSyncer deviceSyncer; @@ -38,6 +38,11 @@ __forceinline__ __device__ __half2 add_elements(__half2 a, __half2 b) { return __hadd2(a, b); } +template <> +__forceinline__ __device__ __bfloat162 add_elements(__bfloat162 a, __bfloat162 b) { + return __hadd2(a, b); +} + template __forceinline__ __device__ int4 add_vectors_helper(int4 a, int4 b) { int4 ret; @@ -58,6 +63,11 @@ __forceinline__ __device__ int4 add_vectors<__half>(int4 a, int4 b) { return add_vectors_helper<__half2>(a, b); } +template <> +__forceinline__ __device__ int4 add_vectors<__bfloat16>(int4 a, int4 b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template __forceinline__ __device__ uint2 add_vectors_helper(uint2 a, uint2 b) { uint2 ret; @@ -76,6 +86,11 @@ __forceinline__ __device__ uint2 add_vectors<__half>(uint2 a, uint2 b) { return add_vectors_helper<__half2>(a, b); } +template <> +__forceinline__ __device__ uint2 add_vectors<__bfloat16>(uint2 a, uint2 b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template __forceinline__ __device__ int add_vectors_helper(int a, int b) { return bit_cast(add_elements(bit_cast(a), bit_cast(b))); @@ -91,6 +106,11 @@ __forceinline__ __device__ int add_vectors<__half>(int a, int b) { return add_vectors_helper<__half2>(a, b); } +template <> +__forceinline__ __device__ int add_vectors<__bfloat16>(int a, int b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template __forceinline__ __device__ uint32_t add_vectors_helper(uint32_t a, uint32_t b) { return bit_cast(add_elements(bit_cast(a), bit_cast(b))); @@ -106,6 +126,11 @@ __forceinline__ __device__ uint32_t add_vectors<__half>(uint32_t a, uint32_t b) return add_vectors_helper<__half2>(a, b); } +template <> +__forceinline__ __device__ uint32_t add_vectors<__bfloat16>(uint32_t a, uint32_t b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template __forceinline__ __device__ void vectorSum(T* dst, T* src, size_t nElem, int blockId, int nBlocks) { size_t nInt4 = nElem / 4; diff --git a/apps/nccl/src/nccl.cu b/apps/nccl/src/nccl.cu index 01bc56e0b..cb0e7d56e 100644 --- a/apps/nccl/src/nccl.cu +++ b/apps/nccl/src/nccl.cu @@ -4,8 +4,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -54,6 +56,9 @@ struct ncclComm { std::shared_ptr comm; std::vector> connections; std::vector> smSemaphores; + std::shared_ptr executor; + std::shared_ptr allReducePacketIPPlan, allReducePacketOPPlan, allReduceIPPlan, + allReduceOPPlan; std::unordered_map channelInInfos; std::unordered_map channelOutInfos; @@ -61,6 +66,7 @@ struct ncclComm { std::shared_ptr scratchBuff; std::vector remoteScratchRegMemories; + size_t smallMessageSizeBoundary, largeMessageSizeBoundary; uint32_t numScratchBuff; uint32_t buffFlag; }; @@ -97,6 +103,43 @@ static size_t ncclTypeSize(ncclDataType_t type) { return 0; } +double parseSize(const char* value) { + std::string valueStr(value); + std::istringstream iss(valueStr); + long long int units; + double size; + char size_lit = 0; + + if (iss >> size) { + iss >> std::ws; // eat whitespace + iss >> size_lit; + } else { + return -1.0; + } + + if (size_lit != 0 && !std::isspace(size_lit)) { + switch (size_lit) { + case 'G': + case 'g': + units = 1024 * 1024 * 1024; + break; + case 'M': + case 'm': + units = 1024 * 1024; + break; + case 'K': + case 'k': + units = 1024; + break; + default: + return -1.0; + }; + } else { + units = 1; + } + return size * units; +} + static mscclpp::Transport getTransport(int, int) { // if (rank / nRanksPerNode == peerRank / nRanksPerNode) { // return mscclpp::Transport::CudaIpc; @@ -151,6 +194,91 @@ static std::shared_ptr> setupSmChannel return ptr; } +static ncclResult_t ncclAllReduceFallback(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t, ncclComm_t comm, cudaStream_t stream) { + // Checking if the parameters are valids + if (sendbuff == nullptr || recvbuff == nullptr || count == 0 || ncclTypeSize(datatype) == 0 || comm == nullptr) + return ncclInvalidArgument; + + // Declarating variables + size_t sendBytes, recvBytes; + CUdeviceptr sendBasePtr, recvBasePtr; + MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); + size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; + size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; + uint32_t scratchBuffIdx = (++(comm->buffFlag)) % comm->numScratchBuff; + size_t offsetScratch = (SCRATCH_SIZE / comm->numScratchBuff) * scratchBuffIdx; + int rank = comm->comm->bootstrap()->getRank(); + channelKey sendKey{(void*)sendBasePtr, sendBytes}; + channelKey recvKey{(void*)recvBasePtr, recvBytes}; + mscclpp::DeviceHandle* smChannels = nullptr; + mscclpp::DeviceHandle* smOutChannels = nullptr; + + // Creating the channels + if (count * ncclTypeSize(datatype) <= comm->largeMessageSizeBoundary) { + auto sendIt = comm->channelScratchInfos.find(sendKey); + if (sendIt == comm->channelScratchInfos.end()) { + std::vector channels = + setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast((void*)sendBasePtr)); + ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; + sendIt = comm->channelScratchInfos.emplace(sendKey, channelInfo).first; + } + + smChannels = sendIt->second.smChannelDeviceHandles.get(); + } else { + std::vector remoteMemories; + + auto sendIt = comm->channelInInfos.find(sendKey); + if (sendIt == comm->channelInInfos.end()) { + std::vector channels = + setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast((void*)sendBasePtr)); + ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; + sendIt = comm->channelInInfos.emplace(sendKey, channelInfo).first; + } + + auto recvIt = comm->channelOutInfos.find(recvKey); + if (recvIt == comm->channelOutInfos.end()) { + remoteMemories = + setupRemoteMemories(comm->comm, rank, (void*)recvBasePtr, recvBytes, mscclpp::Transport::CudaIpc); + std::vector outChannels = + setupSmChannels(comm, remoteMemories, const_cast((void*)recvBasePtr)); + ChannelInfo channelInfo{outChannels, setupSmChannelDeviceHandles(outChannels)}; + recvIt = comm->channelOutInfos.emplace(recvKey, channelInfo).first; + } + + smChannels = sendIt->second.smChannelDeviceHandles.get(); + smOutChannels = recvIt->second.smChannelDeviceHandles.get(); + } + + switch (datatype) { + case ncclFloat16: + CUDACHECK(allreduce((half*)sendbuff, (half*)comm->scratchBuff.get(), (half*)recvbuff, smChannels, smOutChannels, + offsetIn, offsetOut, offsetScratch, rank, NRANKS_PER_NODE, + comm->comm->bootstrap()->getNranks(), count, stream)); + break; + case ncclFloat32: + CUDACHECK(allreduce((float*)sendbuff, (float*)comm->scratchBuff.get(), (float*)recvbuff, smChannels, + smOutChannels, offsetIn, offsetOut, offsetScratch, comm->comm->bootstrap()->getRank(), + NRANKS_PER_NODE, comm->comm->bootstrap()->getNranks(), count, stream)); + break; + case ncclBfloat16: + CUDACHECK(allreduce((__bfloat16*)sendbuff, (__bfloat16*)comm->scratchBuff.get(), (__bfloat16*)recvbuff, + smChannels, smOutChannels, offsetIn, offsetOut, offsetScratch, rank, NRANKS_PER_NODE, + comm->comm->bootstrap()->getNranks(), count, stream)); + break; + case ncclInt32: + case ncclUint32: + CUDACHECK(allreduce((int*)sendbuff, (int*)comm->scratchBuff.get(), (int*)recvbuff, smChannels, smOutChannels, + offsetIn, offsetOut, offsetScratch, comm->comm->bootstrap()->getRank(), NRANKS_PER_NODE, + comm->comm->bootstrap()->getNranks(), count, stream)); + break; + default: + return ncclInvalidArgument; + } + return ncclSuccess; +} + NCCL_API ncclResult_t ncclGetVersion(int* version) { if (version == nullptr) return ncclInvalidArgument; *version = MSCCLPP_VERSION; @@ -211,6 +339,30 @@ NCCL_API ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueI commPtr->scratchBuff = mscclpp::allocExtSharedCuda(SCRATCH_SIZE); commPtr->remoteScratchRegMemories = setupRemoteMemories(commPtr->comm, rank, commPtr->scratchBuff.get(), SCRATCH_SIZE, mscclpp::Transport::CudaIpc); + commPtr->executor = std::make_shared(mscclppComm); + + if (getenv("ALLREDUCEPKT_IP_JSON_FILE")) + commPtr->allReducePacketIPPlan = std::make_shared( + mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_IP_JSON_FILE"))); + if (getenv("ALLREDUCEPKT_OP_JSON_FILE")) + commPtr->allReducePacketOPPlan = std::make_shared( + mscclpp::ExecutionPlan("allreduce_packet", getenv("ALLREDUCEPKT_OP_JSON_FILE"))); + if (getenv("ALLREDUCE_IP_JSON_FILE")) + commPtr->allReduceIPPlan = + std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_IP_JSON_FILE"))); + if (getenv("ALLREDUCE_OP_JSON_FILE")) + commPtr->allReduceOPPlan = + std::make_shared(mscclpp::ExecutionPlan("allreduce", getenv("ALLREDUCE_OP_JSON_FILE"))); + if (getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")) + commPtr->smallMessageSizeBoundary = parseSize(getenv("ALLREDUCE_SMALL_MSG_BOUNDARY")); + else + commPtr->smallMessageSizeBoundary = 16 * (1 << 10); + if (getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")) + commPtr->largeMessageSizeBoundary = parseSize(getenv("ALLREDUCE_LARGE_MSG_BOUNDARY")); + else + commPtr->largeMessageSizeBoundary = 1 << 20; + + if (commPtr->smallMessageSizeBoundary > commPtr->largeMessageSizeBoundary) return ncclInvalidArgument; *comm = commPtr; return ncclSuccess; @@ -321,82 +473,50 @@ NCCL_API ncclResult_t ncclBroadcast(const void*, void*, size_t, ncclDataType_t, } NCCL_API ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, - ncclRedOp_t, ncclComm_t comm, cudaStream_t stream) { + ncclRedOp_t reductionOperation, ncclComm_t comm, cudaStream_t stream) { // Checking if the parameters are valids if (sendbuff == nullptr || recvbuff == nullptr || count == 0 || ncclTypeSize(datatype) == 0 || comm == nullptr) return ncclInvalidArgument; // Declarating variables - size_t sendBytes, recvBytes; - CUdeviceptr sendBasePtr, recvBasePtr; - MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); - MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); - size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; - size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; - uint32_t scratchBuffIdx = (++(comm->buffFlag)) % comm->numScratchBuff; - size_t offsetScratch = (SCRATCH_SIZE / comm->numScratchBuff) * scratchBuffIdx; + size_t bytes = count * ncclTypeSize(datatype); int rank = comm->comm->bootstrap()->getRank(); - channelKey sendKey{(void*)sendBasePtr, sendBytes}; - channelKey recvKey{(void*)recvBasePtr, recvBytes}; - mscclpp::DeviceHandle* smChannels = nullptr; - mscclpp::DeviceHandle* smOutChannels = nullptr; - // Creating the channels - if (count * ncclTypeSize(datatype) <= (1 << 20)) { - auto sendIt = comm->channelScratchInfos.find(sendKey); - if (sendIt == comm->channelScratchInfos.end()) { - std::vector channels = - setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast((void*)sendBasePtr)); - ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; - sendIt = comm->channelScratchInfos.emplace(sendKey, channelInfo).first; - } - - smChannels = sendIt->second.smChannelDeviceHandles.get(); + if (bytes < comm->smallMessageSizeBoundary) { + return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); } else { - std::vector remoteMemories; - - auto sendIt = comm->channelInInfos.find(sendKey); - if (sendIt == comm->channelInInfos.end()) { - std::vector channels = - setupSmChannels(comm, comm->remoteScratchRegMemories, const_cast((void*)sendBasePtr)); - ChannelInfo channelInfo{channels, setupSmChannelDeviceHandles(channels)}; - sendIt = comm->channelInInfos.emplace(sendKey, channelInfo).first; + std::shared_ptr plan; + if (bytes <= comm->largeMessageSizeBoundary) + plan = (sendbuff == recvbuff) ? comm->allReducePacketIPPlan : comm->allReducePacketOPPlan; + else + plan = (sendbuff == recvbuff) ? comm->allReduceIPPlan : comm->allReduceOPPlan; + + if (plan == nullptr) + return ncclAllReduceFallback(sendbuff, recvbuff, count, datatype, reductionOperation, comm, stream); + + switch (datatype) { + case ncclFloat16: + comm->executor->execute(rank, (half*)sendbuff, (half*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT16, 1024, + *plan, stream, mscclpp::PacketType::LL8); + break; + case ncclFloat32: + comm->executor->execute(rank, (float*)sendbuff, (float*)recvbuff, bytes, bytes, mscclpp::DataType::FLOAT32, + 1024, *plan, stream, mscclpp::PacketType::LL8); + break; + case ncclBfloat16: + comm->executor->execute(rank, (__bfloat16*)sendbuff, (__bfloat16*)recvbuff, bytes, bytes, + mscclpp::DataType::BFLOAT16, 1024, *plan, stream, mscclpp::PacketType::LL8); + break; + case ncclInt32: + case ncclUint32: + comm->executor->execute(rank, (int*)sendbuff, (int*)recvbuff, bytes, bytes, mscclpp::DataType::UINT32, 1024, + *plan, stream, mscclpp::PacketType::LL8); + break; + default: + return ncclInvalidArgument; } - - auto recvIt = comm->channelOutInfos.find(recvKey); - if (recvIt == comm->channelOutInfos.end()) { - remoteMemories = - setupRemoteMemories(comm->comm, rank, (void*)recvBasePtr, recvBytes, mscclpp::Transport::CudaIpc); - std::vector outChannels = - setupSmChannels(comm, remoteMemories, const_cast((void*)recvBasePtr)); - ChannelInfo channelInfo{outChannels, setupSmChannelDeviceHandles(outChannels)}; - recvIt = comm->channelOutInfos.emplace(recvKey, channelInfo).first; - } - - smChannels = sendIt->second.smChannelDeviceHandles.get(); - smOutChannels = recvIt->second.smChannelDeviceHandles.get(); } - switch (datatype) { - case ncclFloat16: - CUDACHECK(allreduce((half*)sendbuff, (half*)comm->scratchBuff.get(), (half*)recvbuff, smChannels, smOutChannels, - offsetIn, offsetOut, offsetScratch, rank, NRANKS_PER_NODE, - comm->comm->bootstrap()->getNranks(), count, stream)); - break; - case ncclFloat32: - CUDACHECK(allreduce((float*)sendbuff, (float*)comm->scratchBuff.get(), (float*)recvbuff, smChannels, - smOutChannels, offsetIn, offsetOut, offsetScratch, comm->comm->bootstrap()->getRank(), - NRANKS_PER_NODE, comm->comm->bootstrap()->getNranks(), count, stream)); - break; - case ncclInt32: - case ncclUint32: - CUDACHECK(allreduce((int*)sendbuff, (int*)comm->scratchBuff.get(), (int*)recvbuff, smChannels, smOutChannels, - offsetIn, offsetOut, offsetScratch, comm->comm->bootstrap()->getRank(), NRANKS_PER_NODE, - comm->comm->bootstrap()->getNranks(), count, stream)); - break; - default: - return ncclInvalidArgument; - } return ncclSuccess; } @@ -442,6 +562,7 @@ NCCL_API ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t CUDACHECK(allgather((int*)sendbuff, (int*)nullptr, (int*)recvbuff, smChannels, offsetOut, rank, NRANKS_PER_NODE, nRank, bytes / sizeof(int), stream)); } + return ncclSuccess; } diff --git a/apps/nccl/test/CMakeLists.txt b/apps/nccl/test/CMakeLists.txt index 025d2db79..66c8e2698 100644 --- a/apps/nccl/test/CMakeLists.txt +++ b/apps/nccl/test/CMakeLists.txt @@ -4,5 +4,9 @@ find_package(MPI) add_executable(nccl_api_test nccl_api_test.cc) -target_link_libraries(nccl_api_test mscclpp mscclpp_nccl ${GPU_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES} Threads::Threads MPI::MPI_CXX) +target_link_libraries(nccl_api_test mscclpp mscclpp_nccl ${GPU_LIBRARIES} ${NUMA_LIBRARIES} Threads::Threads MPI::MPI_CXX) +if(IBVERBS_FOUND) + target_link_libraries(nccl_api_test ${IBVERBS_LIBRARIES}) + target_compile_definitions(nccl_api_test PRIVATE USE_IBVERBS) +endif() target_include_directories(nccl_api_test PRIVATE ${PROJECT_SOURCE_DIR}/include ${PROJECT_SOURCE_DIR}/apps/nccl/include) diff --git a/docs/.gitignore b/docs/.gitignore index 00d9344fb..a69fac7ab 100644 --- a/docs/.gitignore +++ b/docs/.gitignore @@ -1,3 +1,2 @@ doxygen/ _build/ -sphinx/ diff --git a/docs/README.md b/docs/README.md index 2bb9c1efb..6cf2cc924 100644 --- a/docs/README.md +++ b/docs/README.md @@ -9,7 +9,7 @@ 2. Install Python packages below. If you install them on the user's local, you need to include `~/.local/bin` to `$PATH` (to use `sphinx-build`). ```bash - $ sudo python3 -m pip install sphinx sphinx_rtd_theme breathe + $ sudo python3 -m pip install -r ./requirements.txt ``` 3. Create Doxygen documents. @@ -21,7 +21,7 @@ 4. Create Sphinx documents. ```bash - $ sphinx-build -b html -Dbreathe_projects.mscclpp=$PWD/doxygen/xml $PWD $PWD/sphinx + $ make html ``` -5. Done. The HTML files will be on `sphinx/` directory. +5. Done. The HTML files will be on `_build/` directory. diff --git a/docs/api/index.rst b/docs/api/index.rst new file mode 100644 index 000000000..461a9fbdb --- /dev/null +++ b/docs/api/index.rst @@ -0,0 +1,5 @@ +API Reference +============= + +.. doxygennamespace:: mscclpp + :members: diff --git a/docs/conf.py b/docs/conf.py index 4a94b3aa5..4d3a91022 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -14,12 +14,13 @@ # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration -extensions = ["breathe"] +extensions = ["breathe", "myst_parser"] templates_path = ["_templates"] exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # Breathe configuration +breathe_projects = {"mscclpp": "./doxygen/xml"} breathe_default_project = "mscclpp" # -- Options for HTML output ------------------------------------------------- diff --git a/docs/design/design.md b/docs/design/design.md new file mode 100644 index 000000000..82b6e0965 --- /dev/null +++ b/docs/design/design.md @@ -0,0 +1,157 @@ +# MSCCL++ Design Document +## Introduction +MSCCL++ redefines inter-GPU communication interfaces, thereby delivering a highly efficient and customizable communication stack for distributed GPU applications. Its design is specifically tailored to accommodate diverse performance optimization scenarios often encountered in state-of-the-art AI applications. The figure below provides a high-level overview of MSCCL++ abstractions in CUDA, C, and Python. + + +```{figure} ../figs/abstractions.png +:name: MSCCL++ Abstractions +:alt: MSCCL++ Abstractions +:align: center + +MSCCL++ Abstractions Overview +``` + +The followings highlight the key features of MSCCL++. +* **Light-weight and multi-layer abstractions.** MSCCL++ provides communication abstractions at lowest level close to hardware and at the highest level close to application API. The lowest level of abstraction is ultra light weight which enables a user to implement logics of data movement for a collective operation such as AllReduce inside a GPU kernel extremely efficiently without worrying about memory ordering of different ops. The modularity of MSCCL++ enables a user to construct the building blocks of MSCCL++ in a high level abstraction in Python and feed them to a CUDA kernel in order to facilitate the user's productivity. + +* **1-sided 0-copy synchronous and asynchronous abstracts.** MSCCL++ provides fine-grained synchronous and asynchronous 0-copy 1-sided abstracts for communication primitives such as `put()`, `get()`, `signal()`, `flush()`, and `wait()`. The 1-sided abstractions allows a user to asynchronously `put()` their data on the remote GPU as soon as it is ready without requiring the remote side to issue any receive instruction. This enables users to easily implement flexible communication logics, such as overlapping communication with computation, or implementing customized collective communication algorithms without worrying about potential deadlocks. Additionally, the 0-copy capability enables MSCCL++ to directly transfer data between user's buffers without using intermediate internal buffers which saves GPU bandwidth and memory capacity. + +* **Unified abstractions for different interconnection hardware.** MSCCL++ provides consistent abstractions regardless of the location of the remote GPU (either on the local node or on a remote node) or the underlying link (either NVLink/xGMI or InfiniBand). This simplifies the code for inter-GPU communication, which is often complex due to memory ordering of GPU/CPU read/writes and therefore, is error-prone. + +## Concepts + +To implement the list of features above, some concepts are introduced. +### Channel +MSCCL++ provides peer-to-peer communication methods between GPUs. A peer-to-peer connection between two GPUs is called a *Channel*. Channels are constructed by MSCCL++ host-side interfaces and copied to GPUs during initialization. Channels provide *GPU-side interfaces*, which means that all communication methods are defined as a device function to be called from a GPU kernel code. Following code shows the basic usage for channel, the `put()` method in the following code copies 1KB data from the local GPU to a remote GPU. +```cpp +__global__ void gpuKernel() { + ... + // Only one thread is needed for this method. + channel.put(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024); + ... +} +``` +MSCCL++ also provides efficient synchronization methods, `signal()`, `flush()`, and `wait()`. We will discuss these methods in the following sections. + +#### SmChannel & ProxyChannel +MSCCL++ delivers two types of channels, **ProxyChannel** and **SmChannel**. `ProxyChannel` provides (R)DMA-based data copy and synchronization methods. When called, these methods send/receive a signal to/from a host-side proxy (hence the name `ProxyChannel`), which will trigger (R)DMA (such as `cudaMemcpy*` or `ibv_post_send`) or issue synchronization methods (such as `cudaStreamSynchronize` or `ibv_poll_cq`). Since the key functionalities are run by the proxy, ProxyChannel requires only a single GPU thread to call its methods. See all `ProxyChannel` methods from [here](https://github.com/microsoft/mscclpp/blob/main/include/mscclpp/proxy_channel_device.hpp). + +On the other hand, `SmChannel` provides memory-mapping-based copy and synchronization methods. When called, these methods will directly use GPU threads to read/write from/to the remote GPU's memory space. Comparing against ProxyChannel, SmChannel is especially performant for low-latency scenarios, while it may need many GPU threads to call copying methods at the same time to achieve high copying bandwidth. See all SmChannel methods from [here](https://github.com/microsoft/mscclpp/blob/main/include/mscclpp/sm_channel_device.hpp). + +### Fifo & Trigger +One of the key features of MSCCL++ is to offload the communication logic from the GPU to the CPU. +To offload the communication logic from the GPU to the CPU, MSCCL++ introduces the concept of `Fifo` and `Trigger`. A Fifo is a circular buffer that shared between the GPU and the CPU. It is used to store `Trigger`. A `Trigger` is a signal that is sent from the GPU to the CPU to notify the CPU that there are commands in the Fifo that need to be processed. The CPU will then process the commands in the Fifo and send a signal back to the GPU to notify the GPU that the commands have been processed. The implementation details of Fifo and Trigger can be found in following sections. + +### ProxyService +Proxy service is a persistent service that resides in the CPU side. It functions as a polling service that receives the message `Trigger` from the GPU side and then transfers data according to the command. When we use `ProxyChannel` for communication, a `Trigger` is sent from the GPU side to the `ProxyService`. Then `ProxyService` will invoke `cudaMemcpy*` or `IB verbs` to transfer data to the targe device. + +## Implementation + +The core of MSCCL++ is implemented in C++ and CUDA. We offer both C++ and Python APIs for initializing communication channels. For interactions within the GPU kernel, we offer a collection of low-level device functions. Subsequent sections will delve into these interfaces and the methodology for transferring communication logic from the GPU to the CPU. + +### Interfaces +This section delivers a comprehensive overview of the MSCCL++ interfaces, encompassing both the setup and initialization of communication channels and the MSCCL++ kernel programming model. + +#### Communication setup and initialization APIs +MSCCL++ provides APIs in both C++ and Python for establishing communication channels, with further information available in the [Initialization](../getting-started/tutorials/initialization.md) section. Presently, it supports two types of transports: `cudaIPC` for `NVLink/xGMI`, and `IB` for `InfiniBand`. Users are empowered to select the connection type that best suits their hardware infrastructure. + +#### MSCCL++ kernel programming model +MSCCL++ offers one-sided communication methods directly callable from a GPU kernel, encompassing two primary API categories: data copy and synchronization. The data copy API features functions such as `put()`, `get()`, `read()`, and `write()`, while the synchronization API comprises `signal()`, `flush()`, and `wait()`. Demonstrated below, the basic utilization of the data copy API involves the `put()` method, which facilitates the transfer of 1KB of data from a local GPU to a remote GPU. Then send a signal to remote peer to notify the data is ready to use. To receive the data, the remote peer can call `wait()` method. +This operation is executed within a kernel launched with a single block. +```cpp +// Running on rank 0 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChannel) { + smChannel[0].put(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x); + __syncthreads(); + if (threadIdx.x == 0) { + smChannel[0].signal(); + } +} + +// Running on rank 1 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChannel) { + if (threadIdx.x == 0) { + smChannel[0].wait(); + } + __syncthreads(); + // Data is ready to use +} +``` + +Similar to the LL protocol offered by NCCL, MSCCL++ introduces a `Packet` structure designed to facilitate the transfer of both data and flags within a single instruction, proving particularly beneficial for applications where latency is a critical concern. The following code shows the basic usage of the `Packet` structure. The flag should be same for sender and receiver side. +```cpp +// Running on rank 0 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChans, int flag) { + smChans[0].putPackets(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x, + /*flag=*/ flag); +} + +// Running on rank 1 +__device__ void gpuKernel(mscclpp::SmChannelDeviceHandle* smChans, int flag) { + smChans[0].getPackets(/*dstOffset=*/ 0, /*srcOffset=*/ 0, /*size=*/ 1024, /*threadId*/ threadIdx.x, /*numThreads*/ blockDim.x, + /*flag=*/ flag); + // Data is ready to use +} +``` + +### The mechanism for offloading communication logic from the GPU to the CPU + +As mentioned in the previous section, the offloading of communication logic from the GPU to the CPU is accomplished through the `Fifo` and `Trigger` mechanism. + +The accompanying figure details the structure of `Tigger`, employing three bits to denote the operation type: `data transfer`, `signal`, and `flush`. The remaining fields specify the precise data locations for both local and remote buffers. + +``` +|-------------------|-------------------|-------------------|-----------------|-----------------|---------|-------------------|---------------| +| 32bit size | 32bit src offset | 32bit dst offset | 9bit src mem id | 9bit dst mem id | 3bit op | 10bit channel id | 1bit reserved | +|-------------------|-------------------|-------------------|-----------------|-----------------|---------|-------------------|---------------| +``` +
The proxy trigger format
+ +Page-locked memory is utilized for the `Fifo`, guaranteeing access by both the CPU and GPU. On the CPU side, a polling thread periodically checks the Fifo for new commands. Upon processing a command, it updates an incremented counter to signal to the GPU that the command has been executed. Users wishing to ensure a command has been processed can invoke `flush()`, which waits for the device-side counter to reflect this update. + +## Use Cases + +In this section, we will discuss several use cases that demonstrate the capabilities of MSCCL++. + +### Overlapping communication with computation + +MSCCL++ enables the offloading of communication logic from the GPU to the CPU, facilitating the overlapping of communication and computation processes. The code snippet provided illustrates this overlapping technique. In the depicted scenario, the GPU emits a signal to the CPU indicating readiness for data transfer. Subsequently, while the GPU continues to execute computation tasks, the CPU initiates the data transfer to the designated target device. +```cpp +__device__ void gpuKernel(mscclpp::SimpleProxyChannelDeviceHandle* proxyChannel) { + int tid = threadIdx.x + blockIdx.x * blockDim.x; + // Send a trigger to the CPU + if (tid == 0) { + proxyChannel[0].putWithSignal(/*dstOffset*/ 0, /*srcOffset*/ 0, /*size*/ 1024); + } + // Continue computation + matrixMul() + // ... +} +``` + +### Fusion of communication and computation + +Traditional communication libraries enforce a separation between communication and computation, creating a bottleneck where communication must await the completion of computation, especially when data dependencies exist. In contrast, MSCCL++ leverages its low-level premitives to facilitate the seamless integration of communication with computation. By segmenting the computation into tiles, MSCCL++ enables the simultaneous pipelining of computation and communication tasks. This approach not only mitigates the communication delay by overlapping processes but also significantly improves throughput by leveraging the low-level API for fine-grained control over the hardware, ensuring optimal efficiency. + +### Implementing customized collective communication algorithms + +MCSCL++ offers a low-level communication API, allowing users to design customized collective communication algorithms. The following code demonstrates how to implement a customized All2All algorithm using MSCCL++. +```cpp +using DeviceHandle = mscclpp::DeviceHandle; +__device__ void localAlltoall(DeviceHandle* proxyChans, int rank, + int nRanksPerNode, size_t nElements) { + int remoteRank = ((int)blockIdx.x < rank) ? blockIdx.x : blockIdx.x + 1; + for (int i = 1; i < nRanksPerNode; i++) { + DeviceHandle proxyChan = proxyChans[blockIdx.x]; + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank + i) % nRanksPerNode) { + proxyChan.putWithSignalAndFlush(rank * nElements * sizeof(int), remoteRank * nElements * sizeof(int), + nElements * sizeof(int)); + } + // wait for the data from GPU (rank-i) % nranksPerNode to arrive + if (threadIdx.x == 0 && remoteRank % nRanksPerNode == (rank - i + nRanksPerNode) % nRanksPerNode) { + proxyChan.wait(); + } + deviceSyncer.sync(nRanksPerNode - 1); + } +} +``` diff --git a/docs/quickstart.md b/docs/getting-started/quickstart.md similarity index 93% rename from docs/quickstart.md rename to docs/getting-started/quickstart.md index a27bba561..355d48240 100644 --- a/docs/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -20,6 +20,16 @@ lsmod | grep nvidia_peermem ``` +## Build with Docker Images + +We provide docker images which package all prerequisites for MSCCL++. You can setup your dev environment with the following command. + +```bash +$ docker run -it --privileged --net=host --ipc=host --gpus all ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.2 mscclpp-dev bash +``` + +See all available images [here](https://github.com/microsoft/mscclpp/pkgs/container/mscclpp%2Fmscclpp). + ## Build from Source CMake 3.25 or later is required. diff --git a/docs/getting-started/tutorials/customized-proxy-service.md b/docs/getting-started/tutorials/customized-proxy-service.md new file mode 100644 index 000000000..232f81066 --- /dev/null +++ b/docs/getting-started/tutorials/customized-proxy-service.md @@ -0,0 +1 @@ +# Customize the Proxy Service diff --git a/docs/getting-started/tutorials/index.rst b/docs/getting-started/tutorials/index.rst new file mode 100644 index 000000000..7ee91b194 --- /dev/null +++ b/docs/getting-started/tutorials/index.rst @@ -0,0 +1,16 @@ +Tutorials +---------- + +This tutorial section provides a step-by-step guide to help you get started with the C++/Python API. + +.. toctree:: + :maxdepth: 1 + :caption: Tutorials + :hidden: + + initialization + proxy-channel + sm-channel + packet-api + customized-proxy-service + python-api diff --git a/docs/getting-started/tutorials/initialization.md b/docs/getting-started/tutorials/initialization.md new file mode 100644 index 000000000..0bdd8ad45 --- /dev/null +++ b/docs/getting-started/tutorials/initialization.md @@ -0,0 +1,71 @@ +# Commnunication initialize with mscclpp API + +In this tutorial, you will write a simple program to initialize communication between eight GPUs using MSCCL++ C++ API. You will also learn how to use the Python API to initialize communication. + +## Prerequisites +A system with eight GPUs is required to run this tutorial. + +Also make sure that you have installed MSCCL++ on your system. If not, please follow the [quick start](../quickstart.md). + +## Initialize Communication with C++ API +We will setup a mesh topology with eight GPUs. Each GPU will be connected to its neighbors. The following code shows how to initialize communication with MSCCL++ C++ API. + +```cpp +#include +#include +#include + +#include +#include +#include + +template +using DeviceHandle = mscclpp::DeviceHandle; +__constant__ DeviceHandle constProxyChans[8]; + +void setupMeshTopology(int rank, int worldsize, void* data, size_t dataSize) { + std::string ip_port = "10.0.0.4:50000"; + auto bootstrap = std::make_shared(rank, worldsize); + bootstrap->initialize(ip_port); + mscclpp::Communicator comm(bootstrap); + mscclpp::ProxyService proxyService; + + std::vector semaphoreIds; + std::vector localMemories; + std::vector>> connections(world_size); + std::vector> remoteMemories; + + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + mscclpp::Transport transport = mscclpp::Transport::CudaIpc; + // Connect with all other ranks + connections[r] = comm.connectOnSetup(r, 0, transport); + auto memory = comm.registerMemory(data, dataSize, mscclpp::Transport::CudaIpc | ibTransport); + localMemories.push_back(memory); + comm.sendMemoryOnSetup(memory, r, 0); + remoteMemories.push_back(comm.recvMemoryOnSetup(r, 0)); + } + + comm.setup(); + + for (int r = 0; r < world_size; ++r) { + if (r == rank) continue; + semaphoreIds.push_back(proxyService.buildAndAddSemaphore(comm, connections[r].get())); + } + + comm.setup(); + + std::vector> proxyChannels; + for (size_t i = 0; i < semaphoreIds.size(); ++i) { + proxyChannels.push_back(mscclpp::deviceHandle(mscclpp::SimpleProxyChannel( + proxyService.proxyChannel(semaphoreIds[i]), proxyService.addMemory(remoteMemories[i].get()), + proxyService.addMemory(localMemories[i])))); + } + + if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { + std::runtime_error("unexpected error"); + } + CUDACHECK(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), + sizeof(DeviceHandle) * proxyChannels.size())); +} +``` diff --git a/docs/getting-started/tutorials/packet-api.md b/docs/getting-started/tutorials/packet-api.md new file mode 100644 index 000000000..8f4ea7074 --- /dev/null +++ b/docs/getting-started/tutorials/packet-api.md @@ -0,0 +1 @@ +# Packet API for latency sensitive applications diff --git a/docs/getting-started/tutorials/proxy-channel.md b/docs/getting-started/tutorials/proxy-channel.md new file mode 100644 index 000000000..fec5c4cc0 --- /dev/null +++ b/docs/getting-started/tutorials/proxy-channel.md @@ -0,0 +1,3 @@ +# Offload commnunication to CPU with ProxyChannel + +TBU diff --git a/docs/getting-started/tutorials/python-api.md b/docs/getting-started/tutorials/python-api.md new file mode 100644 index 000000000..9e6c5627b --- /dev/null +++ b/docs/getting-started/tutorials/python-api.md @@ -0,0 +1,92 @@ +# Working with Python API + +We provide Python API which help to initialze and setup the channel easily. +In this tutorial, you will write a simple program to initialize communication between eight GPUs using MSCCL++ Python API. + +## Setup Channel with Python API + +We will setup a mesh topology with eight GPUs. Each GPU will be connected to its neighbors. The following code shows how to initialize communication with MSCCL++ Python API. +```python +from mpi4py import MPI +import cupy as cp + +from mscclpp import ( + ProxyService, + Transport, +) +import mscclpp.comm as mscclpp_comm + +def create_connection(group: mscclpp_comm.CommGroup, transport: str): + remote_nghrs = list(range(group.nranks)) + remote_nghrs.remove(group.my_rank) + if transport == "NVLink": + tran = Transport.CudaIpc + elif transport == "IB": + tran = group.my_ib_device(group.my_rank % 8) + else: + assert False + connections = group.make_connection(remote_nghrs, tran) + return connections + +if __name__ == "__main__": + mscclpp_group = mscclpp_comm.CommGroup(MPI.COMM_WORLD) + connections = create_connection(mscclpp_group, "NVLink") + nelems = 1024 + memory = cp.zeros(nelem, dtype=cp.int32) + proxy_service = ProxyService() + simple_channels = group.make_proxy_channels(proxy_service, memory, connections) + proxy_service.start_proxy() + mscclpp_group.barrier() + launch_kernel(mscclpp_group.my_rank, mscclpp_group.nranks, simple_channels, memory) + cp.cuda.runtime.deviceSynchronize() + mscclpp_group.barrier() +``` + +### Launch Kernel with Python API +We provide some Python utils to help you launch kernel via python. Here is a exampl. +```python +from mscclpp.utils import KernelBuilder, pack + +def launch_kernel(my_rank: int, nranks: int, simple_channels: List[SimpleProxyChannel], memory: cp.ndarray): + file_dir = os.path.dirname(os.path.abspath(__file__)) + kernel = KernelBuilder(file="test.cu", kernel_name="test", file_dir=file_dir).get_compiled_kernel() + params = b"" + first_arg = next(iter(simple_channels.values())) + size_of_channels = len(first_arg.device_handle().raw) + device_handles = [] + for rank in range(nranks): + if rank == my_rank: + device_handles.append( + bytes(size_of_channels) + ) # just zeros for semaphores that do not exist + else: + device_handles.append(simple_channels[rank].device_handle().raw) + # keep a reference to the device handles so that they don't get garbage collected + d_channels = cp.asarray(memoryview(b"".join(device_handles)), dtype=cp.uint8) + params = pack(d_channels, my_rank, nranks, memory.size) + + nblocks = 1 + nthreads = 512 + kernel.launch_kernel(params, nblocks, nthreads, 0, None) +``` + +The test kernel is defined in `test.cu` as follows: +```cuda +#include +#include + +// be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing +extern "C" __global__ void __launch_bounds__(1024, 1) + simple_proxy_channel(mscclpp::SimpleProxyChannelDeviceHandle* channels, int my_rank, int nranks, + int num_elements) { + int tid = threadIdx.x; + int nthreads = blockDim.x; + uint64_t size_per_rank = (num_elements * sizeof(int)) / nranks; + uint64_t my_offset = size_per_rank * my_rank; + __syncthreads(); + if (tid < nranks && tid != my_rank) { + channels[tid].putWithSignalAndFlush(my_offset, my_offset, size_per_rank); + channels[tid].wait(); + } +} +``` diff --git a/docs/getting-started/tutorials/sm-channel.md b/docs/getting-started/tutorials/sm-channel.md new file mode 100644 index 000000000..191e47b36 --- /dev/null +++ b/docs/getting-started/tutorials/sm-channel.md @@ -0,0 +1,3 @@ +# Using SmChannel for Intra-Node Communication + +TBU diff --git a/docs/index.rst b/docs/index.rst index ba060047c..e243d2824 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,11 +6,54 @@ Welcome to MSCCL++'s documentation! =================================== +MSCCL++ is a GPU-driven communication stack for scalable AI applications. It is designed to provide a high-performance, scalable, and customizable communication stack for distributed GPU applications. + +Getting Started +--------------- +- Follow the :doc:`quick start ` for your platform of choice. +- Take a look at the :doc:`tutorials ` to learn how to write your first mscclpp program. + +.. toctree:: + :maxdepth: 1 + :caption: Getting Started + :hidden: + + getting-started/quickstart + getting-started/tutorials/index + +Design +------- +- :doc:`Design ` doc for those who want to understand the internals of MSCCL++. + +.. toctree:: + :maxdepth: 1 + :caption: Design + :hidden: + + design/design + +Performance +--------------- +- We evaluate the performance of MSCCL++ in A100 and H100. Here are some :doc:`performance results ` for all-reduce operations. + .. toctree:: - :maxdepth: 2 - :caption: Contents: + :maxdepth: 1 + :caption: Performance + :hidden: + + performance/performance-ndmv4 + +C++ API +--------------- +- :doc:`mscclpp ` +.. toctree:: + :maxdepth: 1 + :caption: C++ API + :hidden: + + api/index Indices and tables ================== @@ -18,9 +61,3 @@ Indices and tables * :ref:`genindex` * :ref:`modindex` * :ref:`search` - -Docs -==== - -.. doxygennamespace:: mscclpp - :members: diff --git a/docs/performance-ndmv4.md b/docs/performance/performance-ndmv4.md similarity index 100% rename from docs/performance-ndmv4.md rename to docs/performance/performance-ndmv4.md diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 000000000..82bb70d03 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,3 @@ +breathe +sphinx_rtd_theme +myst_parser diff --git a/include/mscclpp/executor.hpp b/include/mscclpp/executor.hpp index 23dc7cece..52c3c6da9 100644 --- a/include/mscclpp/executor.hpp +++ b/include/mscclpp/executor.hpp @@ -15,6 +15,7 @@ enum class DataType { UINT32, FLOAT16, FLOAT32, + BFLOAT16, }; enum class PacketType { diff --git a/include/mscclpp/nvls_device.hpp b/include/mscclpp/nvls_device.hpp index 57f65464c..402a65218 100644 --- a/include/mscclpp/nvls_device.hpp +++ b/include/mscclpp/nvls_device.hpp @@ -8,7 +8,7 @@ #include #if defined(MSCCLPP_DEVICE_CUDA) -#include +#include #endif // defined(MSCCLPP_DEVICE_CUDA) #include "device.hpp" @@ -25,27 +25,32 @@ struct DeviceMulticastPointerDeviceHandle { size_t bufferSize; #if defined(MSCCLPP_DEVICE_CUDA) - template + template MSCCLPP_DEVICE_INLINE static void multimemLoadReduce(TValue& val, T* ptr) { - if constexpr (std::is_same::value && std::is_same::value) { + if constexpr (std::is_same_v && std::is_same_v) { asm("multimem.ld_reduce.relaxed.sys.global.add.v4.f32 {%0,%1,%2,%3}, [%4];" : "=r"(val.x), "=r"(val.y), "=r"(val.z), "=r"(val.w) : "l"(ptr) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { + asm("multimem.ld_reduce.relaxed.sys.global.add.v2.f32 {%0,%1}, [%2];" + : "=r"(val.x), "=r"(val.y) + : "l"(ptr) + : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { + asm("multimem.ld_reduce.relaxed.sys.global.add.f32 {%0}, [%1];" : "=r"(val.x) : "l"(ptr) : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { asm("multimem.ld_reduce.relaxed.sys.global.add.v4.f16x2 {%0,%1,%2,%3}, [%4];" : "=r"(val.x), "=r"(val.y), "=r"(val.z), "=r"(val.w) : "l"(ptr) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm("multimem.ld_reduce.relaxed.sys.global.add.v2.f16x2 {%0,%1}, [%2];" : "=r"(val.x), "=r"(val.y) : "l"(ptr) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm("multimem.ld_reduce.relaxed.sys.global.add.f16x2 {%0}, [%1];" : "=r"(val.x) : "l"(ptr) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { - asm("multimem.ld_reduce.relaxed.sys.global.add.f16 {%0}, [%1];" : "=r"(val.x) : "l"(ptr) : "memory"); } else { static_assert(dependentFalse, "Not supported type"); } @@ -53,21 +58,24 @@ struct DeviceMulticastPointerDeviceHandle { template MSCCLPP_DEVICE_INLINE static void multimemStore(const TValue& val, T* ptr) { - if constexpr (std::is_same::value && std::is_same::value) { + if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.st.relaxed.sys.global.v4.f32 [%0], {%1,%2,%3,%4};" ::"l"(ptr), "r"(val.x), "r"(val.y), "r"(val.z), "r"(val.w) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { + asm volatile("multimem.st.relaxed.sys.global.v2.f32 [%0], {%1,%2};" ::"l"(ptr), "r"(val.x), "r"(val.y) + : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { + asm volatile("multimem.st.relaxed.sys.global.f32 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.st.relaxed.sys.global.v4.f16x2 [%0], {%1,%2,%3,%4};" ::"l"(ptr), "r"(val.x), "r"(val.y), "r"(val.z), "r"(val.w) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.st.relaxed.sys.global.v2.f16x2 [%0], {%1,%2};" ::"l"(ptr), "r"(val.x), "r"(val.y) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.st.relaxed.sys.global.f16x2 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { - asm volatile("multimem.st.relaxed.sys.global.f16 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); } else { static_assert(dependentFalse, "Not supported type"); } @@ -75,21 +83,24 @@ struct DeviceMulticastPointerDeviceHandle { template MSCCLPP_DEVICE_INLINE static void multimemStoreReduce(const TValue& val, T* ptr) { - if constexpr (std::is_same::value && std::is_same::value) { + if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.red.relaxed.sys.global.add.v4.f32 [%0], {%1,%2,%3,%4};" ::"l"(ptr), "r"(val.x), "r"(val.y), "r"(val.z), "r"(val.w) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { + asm volatile("multimem.red.relaxed.sys.global.add.v2.f32 [%0], {%1,%2};" ::"l"(ptr), "r"(val.x), "r"(val.y) + : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { + asm volatile("multimem.red.relaxed.sys.global.add.f32 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.red.relaxed.sys.global.add.v4.f16x2 [%0], {%1,%2,%3,%4};" ::"l"(ptr), "r"(val.x), "r"(val.y), "r"(val.z), "r"(val.w) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.red.relaxed.sys.global.add.v2.f16x2 [%0], {%1,%2};" ::"l"(ptr), "r"(val.x), "r"(val.y) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { + } else if constexpr (std::is_same_v && std::is_same_v) { asm volatile("multimem.red.relaxed.sys.global.add.f16x2 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); - } else if constexpr (std::is_same::value && std::is_same::value) { - asm volatile("multimem.red.relaxed.sys.global.add.f16 [%0], {%1};" ::"l"(ptr), "r"(val.x) : "memory"); } else { static_assert(dependentFalse, "Not supported type"); } diff --git a/python/mscclpp/__init__.py b/python/mscclpp/__init__.py index c9df30cf1..ce9917ab5 100644 --- a/python/mscclpp/__init__.py +++ b/python/mscclpp/__init__.py @@ -6,6 +6,7 @@ from ._mscclpp import ( Communicator, Connection, + connect_nvls_collective, EndpointConfig, Fifo, Host2DeviceSemaphore, diff --git a/python/mscclpp/comm.py b/python/mscclpp/comm.py index 4f0111d48..ca3620924 100644 --- a/python/mscclpp/comm.py +++ b/python/mscclpp/comm.py @@ -8,6 +8,7 @@ from ._mscclpp import ( Communicator, Connection, + connect_nvls_collective, EndpointConfig, Host2DeviceSemaphore, Host2HostSemaphore, diff --git a/python/mscclpp/executor_py.cpp b/python/mscclpp/executor_py.cpp index 9f58eac8f..dadbf40f6 100644 --- a/python/mscclpp/executor_py.cpp +++ b/python/mscclpp/executor_py.cpp @@ -16,7 +16,8 @@ void register_executor(nb::module_& m) { .value("int32", DataType::INT32) .value("uint32", DataType::UINT32) .value("float16", DataType::FLOAT16) - .value("float32", DataType::FLOAT32); + .value("float32", DataType::FLOAT32) + .value("bfloat16", DataType::BFLOAT16); nb::enum_(m, "PacketType").value("LL8", PacketType::LL8).value("LL16", PacketType::LL16); diff --git a/python/mscclpp_benchmark/allreduce.cu b/python/mscclpp_benchmark/allreduce.cu index f5e126770..0adf8d548 100644 --- a/python/mscclpp_benchmark/allreduce.cu +++ b/python/mscclpp_benchmark/allreduce.cu @@ -788,7 +788,7 @@ extern "C" __global__ void __launch_bounds__(1024, 1) #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900 -// Barrier among all devices followed by a memory fence +// Barrier among all devices // Should be called by all threads on all devices // Assumes \p num_threads_per_block >= \p num_ranks __forceinline__ __device__ void barrier(mscclpp::SmDevice2DeviceSemaphoreDeviceHandle* semaphores, int thread_id, @@ -806,36 +806,78 @@ __forceinline__ __device__ void barrier(mscclpp::SmDevice2DeviceSemaphoreDeviceH deviceSyncer.sync(num_blocks); } -extern "C" __global__ void __launch_bounds__(1024, 1) - allreduce6(mscclpp::SmDevice2DeviceSemaphoreDeviceHandle* semaphores, - mscclpp::DeviceMulticastPointerDeviceHandle nvlsPtrs, TYPE* buff, int my_rank, int nranks, - size_t nelem) { - float* dev_ptr = (float*)nvlsPtrs.devicePtr; - float* mc_ptr = (float*)nvlsPtrs.mcPtr; +// Assumes \p kVecSize is 1, 2, 4, or 8 (default 8) +template +MSCCLPP_DEVICE_INLINE void allreduce6_helper(mscclpp::SmDevice2DeviceSemaphoreDeviceHandle* semaphores, + mscclpp::DeviceMulticastPointerDeviceHandle nvlsPtrs, int my_rank, + int num_ranks, size_t num_elements) { + DataType* mc_ptr = (DataType*)nvlsPtrs.mcPtr; int tid = threadIdx.x; int bid = blockIdx.x; + int num_threads_per_block = blockDim.x; int num_blocks = gridDim.x; // start with a barrier to ensure all devices have written their values // to their own memory (that is part of the multicast memory) // before reading them in this kernel - barrier(semaphores, tid, bid, num_blocks, nranks); - - int my_st = ((int64_t)nelem * (int64_t)my_rank) / (int64_t)nranks; - int my_en = ((int64_t)nelem * (int64_t)(my_rank + 1)) / (int64_t)nranks; - - int my_offset = (tid + bid * blockDim.x) * 4; - int my_step = blockDim.x * gridDim.x * 4; - - for (int idx = my_st + my_offset; idx < my_en; idx += my_step) { - uint4 val; // fits 8 cutlass::half_t elements; i.e., 4 half2 elements - mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, mc_ptr + idx); - mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, mc_ptr + idx); + barrier(semaphores, tid, bid, num_blocks, num_ranks); + + // every device loads, reduces, and stores a partition of the multicast memory + int rank_start = ((int64_t)num_elements * (int64_t)my_rank) / (int64_t)num_ranks; + int rank_end = ((int64_t)num_elements * (int64_t)(my_rank + 1)) / (int64_t)num_ranks; + + int thread_offset = (bid * num_threads_per_block + tid) * kVecSize; + int thread_step = (num_threads_per_block * num_blocks) * kVecSize; // number of threads * vector size + + for (int idx = rank_start + thread_offset; idx < rank_end; idx += thread_step) { + if constexpr (std::is_same_v && (kVecSize == 4)) { + uint4 val; // fits 4 float elements + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (float*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (float*)(mc_ptr + idx)); + } else if constexpr (std::is_same_v && (kVecSize == 2)) { + uint2 val; // fits 2 float elements + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (float*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (float*)(mc_ptr + idx)); + } else if constexpr (std::is_same_v && (kVecSize == 1)) { + uint1 val; // fits 1 float element + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (float*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (float*)(mc_ptr + idx)); + } else if constexpr (std::is_same_v && (kVecSize == 8)) { + uint4 val; // fits 8 cutlass::half_t elements; i.e., 4 half2 elements + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (half2*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (half2*)(mc_ptr + idx)); + } else if constexpr (std::is_same_v && (kVecSize == 4)) { + uint2 val; // fits 4 cutlass::half_t elements; i.e., 2 half2 elements + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (half2*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (half2*)(mc_ptr + idx)); + } else if constexpr (std::is_same_v && (kVecSize == 2)) { + uint1 val; // fits 2 cutlass::half_t elements; i.e., 1 half2 element + mscclpp::DeviceMulticastPointerDeviceHandle::multimemLoadReduce(val, (half2*)(mc_ptr + idx)); + mscclpp::DeviceMulticastPointerDeviceHandle::multimemStore(val, (half2*)(mc_ptr + idx)); + } else { + // not supported: cannot use static_assert because of the way TYPE is handled in this file + assert(false); // Unsupported data type and vector size combination + } } // end with a barrier to ensure all devices can now read their values // from their own memory (that is part of the multicast memory) // after writing them in this kernel - barrier(semaphores, tid, bid, num_blocks, nranks); + barrier(semaphores, tid, bid, num_blocks, num_ranks); +} + +extern "C" __global__ void __launch_bounds__(1024, 1) + allreduce6(mscclpp::SmDevice2DeviceSemaphoreDeviceHandle* semaphores, + mscclpp::DeviceMulticastPointerDeviceHandle nvlsPtrs, int my_rank, int num_ranks, size_t num_elements, + size_t vector_size) { + if (vector_size == 8) { + allreduce6_helper(semaphores, nvlsPtrs, my_rank, num_ranks, num_elements); + } else if (vector_size == 4) { + allreduce6_helper(semaphores, nvlsPtrs, my_rank, num_ranks, num_elements); + } else if (vector_size == 2) { + allreduce6_helper(semaphores, nvlsPtrs, my_rank, num_ranks, num_elements); + } else { + allreduce6_helper(semaphores, nvlsPtrs, my_rank, num_ranks, num_elements); + } } #endif diff --git a/python/mscclpp_benchmark/allreduce_bench.py b/python/mscclpp_benchmark/allreduce_bench.py index d9d4731b3..69e4f3adc 100644 --- a/python/mscclpp_benchmark/allreduce_bench.py +++ b/python/mscclpp_benchmark/allreduce_bench.py @@ -175,7 +175,7 @@ def run_benchmark( MscclppAllReduce1(mscclpp_group, memory), MscclppAllReduce3(mscclpp_group, memory, proxy_service), ] - if is_nvls_supported(): + if is_nvls_supported() and (data_type == cp.float32 or data_type == cp.float16): mscclpp_algos.append(MscclppAllReduce6(mscclpp_group, nelem, data_type)) else: if memory.nbytes < 2**22: diff --git a/python/mscclpp_benchmark/mscclpp_op.py b/python/mscclpp_benchmark/mscclpp_op.py index 6068a9104..706107bef 100644 --- a/python/mscclpp_benchmark/mscclpp_op.py +++ b/python/mscclpp_benchmark/mscclpp_op.py @@ -468,7 +468,16 @@ def __init__( self.device_handles_cp = cp.asarray(memoryview(b"".join(self.device_handles)), dtype=cp.uint8) self.nvls_handle = self.nvls_mem_handle.device_handle().raw - self.set_params(nblocks, block_size) + if self.memory.dtype != cp.float16 and self.memory.dtype != cp.float32: + raise RuntimeError("Unsupported data type") + + if self.memory.dtype == cp.float16: + vector_size = 8 + elif self.memory.dtype == cp.float32: + vector_size = 4 + else: + vector_size = 1 + self.set_params(nblocks, block_size, vector_size) def get_memory(self): return self.memory @@ -477,23 +486,31 @@ def __call__(self, stream_ptr): self.kernel.launch_kernel(self.params, self.nblocks, self.block_size, 0, stream_ptr) return self.memory - def set_params(self, nblocks, block_size): + def set_params(self, nblocks, block_size, vector_size): self.nblocks = nblocks self.block_size = block_size + self.vector_size = vector_size self.params = b"" self.params += pack( self.device_handles_cp, self.nvls_handle, - self.memory, self.group.my_rank, self.group.nranks, ctypes.c_size_t(self.memory.size), + self.vector_size, ) def auto_tune(self): nblocks_to_try = [8, 12, 16, 24, 32, 48, 64, 72, 96, 108] block_size_to_try = [256, 512, 1024] + if self.memory.dtype == cp.float16: + vector_size_to_try = [8, 4, 2] + elif self.memory.dtype == cp.float32: + vector_size_to_try = [4, 2, 1] + else: + vector_size_to_try = [1] for nblocks in nblocks_to_try: for block_size in block_size_to_try: - self.set_params(nblocks, block_size) - yield nblocks, block_size + for vector_size in vector_size_to_try: + self.set_params(nblocks, block_size, vector_size) + yield nblocks, block_size, vector_size diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 3a0bd2d74..23c3ff483 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -10,6 +10,7 @@ npkit, ) import mscclpp.comm as mscclpp_comm +import os import cupy as cp from mpi4py import MPI diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 4af3ddb36..9535c869f 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -613,7 +613,10 @@ def test_executor(mpi_group: MpiGroup, filename: str): cp.random.seed(42) buffer = cp.random.random(nelems).astype(cp.float16) sub_arrays = cp.split(buffer, mpi_group.comm.size) - sendbuf = sub_arrays[mpi_group.comm.rank] + nelems_per_rank = int(nelems / mpi_group.comm.size) + sendbuf = cp.empty(nelems_per_rank).astype(cp.float16) + for i in range(nelems_per_rank): + sendbuf[i] = sub_arrays[mpi_group.comm.rank][i] expected = cp.zeros_like(sendbuf) for i in range(mpi_group.comm.size): expected += sub_arrays[i] diff --git a/src/connection.cc b/src/connection.cc index fc3724c08..57e77b40b 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -6,13 +6,13 @@ #if defined(ENABLE_NPKIT) #include #endif + #include #include #include #include "debug.h" #include "endpoint.hpp" -#include "infiniband/verbs.h" namespace mscclpp { @@ -173,9 +173,9 @@ void IBConnection::flush(int64_t timeoutUsec) { } } for (int i = 0; i < wcNum; ++i) { - const ibv_wc* wc = qp->getWc(i); - if (wc->status != IBV_WC_SUCCESS) { - throw mscclpp::IbError("a work item failed: status " + std::to_string(wc->status), wc->status); + int status = qp->getWcStatus(i); + if (status != static_cast(WsStatus::Success)) { + throw mscclpp::IbError("a work item failed: status " + std::to_string(status), status); } } } diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 06079f439..ed0fdc505 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -49,6 +49,16 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else ); +#endif + break; + case DataType::BFLOAT16: + executionKernel<__bfloat16><<>>( + rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); #endif break; } diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index e1b84a16c..56eb513e3 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -44,6 +44,8 @@ auto getOpType = [](const std::string& str) { return mscclpp::OperationType::REDUCE_SEND_PACKET; } else if (str == "cpkt") { return mscclpp::OperationType::COPY_PACKET; + } else if (str == "tpkt") { + return mscclpp::OperationType::TRANSFORM_TO_PACKET; } else if (str == "rpkt") { return mscclpp::OperationType::REDUCE_PACKET; } else { @@ -123,7 +125,7 @@ std::vector ExecutionPlan::Impl::getOperations(int rank, int threadbl int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->operations.at(rank).size(); } -void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize) { +void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset) { std::ifstream file(this->planPath); json obj = json::parse(file); if (this->name != obj["name"]) { @@ -145,7 +147,31 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize) { this->setupChannels(gpus); this->inputSize = inputSize; - this->setupOperations(gpus); + this->setupOperations(gpus, contsSrcOffset, constDstOffset); +} + +void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset) { + std::ifstream file(this->planPath); + json obj = json::parse(file); + if (this->name != obj["name"]) { + throw Error("Plan name does not match", ErrorCode::ExecutorError); + } + std::string protocol = obj["protocol"]; + if (protocol == "LL") { + this->isUsingPacket = true; + } + const auto& gpus = obj["gpus"]; + + for (const auto& gpu : gpus) { + int rank = gpu["id"]; + this->inputChunks[rank] = gpu["inputChunks"]; + this->outputChunks[rank] = gpu["outputChunks"]; + this->scratchChunks[rank] = gpu["scratchChunks"]; + this->chunkGroups[rank] = gpu["chunkGroups"]; + } + + this->inputSize = inputSize; + this->setupOperations(gpus, contsSrcOffset, constDstOffset); } // Construct the channel info. Step 1. Flatten SM and PROXY channels into separate vectors. @@ -201,7 +227,7 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) { } } -void ExecutionPlan::Impl::setupOperations(const json& gpus) { +void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffset, size_t constDstOffset) { // setup threadblocks and operations for (const auto& gpu : gpus) { int rank = gpu["id"]; @@ -234,7 +260,8 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) { // Get the relevant channel index in rank channelInfos operation.inputChannelIndexes[i] = channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]]; - operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["i_cids"][i]["off"]); + operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["i_cids"][i]["off"]) + + (srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); chunkIndexes.push_back((uint32_t)op["i_cids"][i]["off"]); } } @@ -243,7 +270,8 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) { operation.nInputs = op["srcs"].size(); operation.inputBufferType = convertToBufferType(op["srcs"][0]["buff"]); for (int i = 0; i < operation.nInputs; i++) { - operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["srcs"][i]["off"]); + operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["srcs"][i]["off"]) + + (operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); chunkIndexes.push_back((uint32_t)op["srcs"][i]["off"]); } } @@ -254,7 +282,8 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) { BufferType dstBufferType = convertToBufferType(op["o_buff"]["dst"]); operation.outputChannelIndexes[i] = channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]]; - operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["o_cids"][i]["off"]); + operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["o_cids"][i]["off"]) + + (dstBufferType != BufferType::SCRATCH ? constDstOffset : 0); chunkIndexes.push_back((uint32_t)op["o_cids"][i]["off"]); } } @@ -263,7 +292,8 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus) { operation.nOutputs = op["dsts"].size(); operation.outputBufferType = convertToBufferType(op["dsts"][0]["buff"]); for (int i = 0; i < operation.nOutputs; i++) { - operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["dsts"][i]["off"]); + operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, (uint32_t)op["dsts"][i]["off"]) + + (operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0); chunkIndexes.push_back((uint32_t)op["dsts"][i]["off"]); } } @@ -340,6 +370,8 @@ void ExecutionPlan::Impl::reset() { this->chunkGroups.clear(); } +void ExecutionPlan::Impl::operationsReset() { this->operations.clear(); } + ExecutionPlan::ExecutionPlan(const std::string& name, const std::string& planPath) : impl_(std::make_shared(name, planPath)) {} diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 62d749d00..a2278ace2 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -54,6 +54,7 @@ static const mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Trans namespace mscclpp { struct ExecutionContext { + std::shared_ptr proxyService; std::unordered_map> connections; std::unordered_map, mscclpp::RegisteredMemory> registeredMemories; std::vector> smSemaphores; @@ -69,29 +70,36 @@ struct ExecutionContext { struct Executor::Impl { int nranksPerNode; std::shared_ptr comm; - std::shared_ptr proxyService; std::unordered_map contexts; - Impl(std::shared_ptr comm) : comm(comm) { - this->nranksPerNode = comm->bootstrap()->getNranksPerNode(); - this->proxyService = std::make_shared(); - } + Impl(std::shared_ptr comm) : comm(comm) { this->nranksPerNode = comm->bootstrap()->getNranksPerNode(); } ~Impl() = default; - ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t sendBufferSize, + ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t messageSize, + size_t contsSrcOffset, size_t constDstOffset, size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) { ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name}; if (this->contexts.find(key) != this->contexts.end()) { + plan.impl_->operationsReset(); + plan.impl_->lightLoadExecutionPlan(messageSize, contsSrcOffset, constDstOffset); + this->setupDeviceExecutionPlan(this->contexts[key], rank, plan); + this->contexts[key].deviceExecutionPlansBuffer = + allocExtSharedCuda(this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan)); + memcpyCuda(this->contexts[key].deviceExecutionPlansBuffer.get(), + (char*)this->contexts[key].deviceExecutionPlans.data(), + this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan), cudaMemcpyHostToDevice); return this->contexts[key]; } + plan.impl_->reset(); - plan.impl_->loadExecutionPlan(sendBufferSize); + plan.impl_->loadExecutionPlan(messageSize, contsSrcOffset, constDstOffset); ExecutionContext context; size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize); std::shared_ptr scratchBuffer = allocExtSharedCuda(scratchBufferSize); context.scratchBuffer = scratchBuffer; context.scratchBufferSize = scratchBufferSize; + context.proxyService = std::make_shared(); this->setupConnections(context, rank, plan); this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, rank, plan); @@ -100,6 +108,7 @@ struct Executor::Impl { allocExtSharedCuda(context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan)); memcpyCuda(context.deviceExecutionPlansBuffer.get(), (char*)context.deviceExecutionPlans.data(), context.deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan), cudaMemcpyHostToDevice); + context.proxyService->startProxy(); this->contexts.insert({key, context}); return context; } @@ -129,7 +138,8 @@ struct Executor::Impl { for (int peer : info.connectedPeers) { if (!inSameNode(rank, peer, this->nranksPerNode)) { flags |= IBs[rank % this->nranksPerNode]; - } + } else + flags |= Transport::CudaIpc; } } } @@ -172,6 +182,16 @@ struct Executor::Impl { comm->setup(); for (size_t i = 0; i < remoteRegMemoryFutures.size(); i++) { context.registeredMemories[{bufferType, connectedPeers[i]}] = std::move(remoteRegMemoryFutures[i].get()); + CUdeviceptr myRegBaseAdr, peerRegBaseAdr; + size_t temp; + MSCCLPP_CUTHROW(cuMemGetAddressRange(&myRegBaseAdr, &temp, (CUdeviceptr)(char*)memory.data())); + MSCCLPP_CUTHROW(cuMemGetAddressRange( + &peerRegBaseAdr, &temp, + (CUdeviceptr)(char*)context.registeredMemories[{bufferType, connectedPeers[i]}].data())); + size_t myRegOffset = (char*)memory.data() - (char*)myRegBaseAdr; + size_t peerRegOffset = + (char*)context.registeredMemories[{bufferType, connectedPeers[i]}].data() - (char*)peerRegBaseAdr; + if (myRegOffset != peerRegOffset) throw Error("Divergent data offset between peers", ErrorCode::ExecutorError); } } } @@ -190,7 +210,7 @@ struct Executor::Impl { std::make_shared(*this->comm, context.connections.at(peer))); } else if (channelType == ChannelType::PROXY) { proxySemaphores.push_back( - this->proxyService->buildAndAddSemaphore(*this->comm, context.connections.at(peer))); + context.proxyService->buildAndAddSemaphore(*this->comm, context.connections.at(peer))); } } } @@ -224,9 +244,9 @@ struct Executor::Impl { context.registeredMemories[{info.dstBufferType, peer}], src, nullptr); } else if (channelType == ChannelType::PROXY) { context.proxyChannels.emplace_back( - this->proxyService->proxyChannel(context.proxySemaphores[index++]), - this->proxyService->addMemory(context.registeredMemories[{info.dstBufferType, peer}]), - this->proxyService->addMemory(localMemory)); + context.proxyService->proxyChannel(context.proxySemaphores[index++]), + context.proxyService->addMemory(context.registeredMemories[{info.dstBufferType, peer}]), + context.proxyService->addMemory(localMemory)); } } } @@ -295,13 +315,20 @@ struct Executor::Impl { Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique(comm)) {} -void Executor::execute(int rank, void* sendbuff, void* recvBuff, size_t sendBuffSize, size_t recvBuffSize, +void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, size_t recvBuffSize, DataType dataType, int nthreads, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { - ExecutionContext context = - this->impl_->setupExecutionContext(rank, sendbuff, recvBuff, sendBuffSize, recvBuffSize, plan); - // TODO(binyli): need to flush proxy channel here this->impl_->proxyService->startProxy(); - this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvBuff, dataType, stream, packetType); + size_t sendBytes, recvBytes; + CUdeviceptr sendBasePtr, recvBasePtr; + MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); + size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; + size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; + + ExecutionContext context = this->impl_->setupExecutionContext( + rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, offsetIn, offsetOut, sendBytes, recvBytes, plan); + // TODO(binyli): need to flush proxy channel here + this->impl_->launchKernel(context, rank, nthreads, sendbuff, recvbuff, dataType, stream, packetType); } Executor::~Executor() = default; diff --git a/src/ib.cc b/src/ib.cc index 9955c5269..d9d72d1a3 100644 --- a/src/ib.cc +++ b/src/ib.cc @@ -3,7 +3,6 @@ #include "ib.hpp" -#include #include #include @@ -15,7 +14,11 @@ #include #include "api.h" +#include "context.hpp" #include "debug.h" +#if defined(USE_IBVERBS) +#include "ibverbs_wrapper.hpp" +#endif // defined(USE_IBVERBS) #if !defined(__HIP_PLATFORM_AMD__) @@ -33,6 +36,8 @@ static bool checkNvPeerMemLoaded() { namespace mscclpp { +#if defined(USE_IBVERBS) + IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size) : buff(buff) { if (size == 0) { throw std::invalid_argument("invalid size: " + std::to_string(size)); @@ -43,9 +48,9 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size) : buff(buff) { } uintptr_t addr = reinterpret_cast(buff) & -pageSize; std::size_t pages = (size + (reinterpret_cast(buff) - addr) + pageSize - 1) / pageSize; - this->mr = ibv_reg_mr(pd, reinterpret_cast(addr), pages * pageSize, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | - IBV_ACCESS_RELAXED_ORDERING | IBV_ACCESS_REMOTE_ATOMIC); + this->mr = IBVerbs::ibv_reg_mr2(pd, reinterpret_cast(addr), pages * pageSize, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_RELAXED_ORDERING | IBV_ACCESS_REMOTE_ATOMIC); if (this->mr == nullptr) { std::stringstream err; err << "ibv_reg_mr failed (errno " << errno << ")"; @@ -54,7 +59,7 @@ IbMr::IbMr(ibv_pd* pd, void* buff, std::size_t size) : buff(buff) { this->size = pages * pageSize; } -IbMr::~IbMr() { ibv_dereg_mr(this->mr); } +IbMr::~IbMr() { IBVerbs::ibv_dereg_mr(this->mr); } IbMrInfo IbMr::getInfo() const { IbMrInfo info; @@ -70,7 +75,7 @@ uint32_t IbMr::getLkey() const { return this->mr->lkey; } IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr, int maxWrPerSend) : numSignaledPostedItems(0), numSignaledStagedItems(0), maxCqPollNum(maxCqPollNum), maxWrPerSend(maxWrPerSend) { - this->cq = ibv_create_cq(ctx, maxCqSize, nullptr, nullptr, 0); + this->cq = IBVerbs::ibv_create_cq(ctx, maxCqSize, nullptr, nullptr, 0); if (this->cq == nullptr) { std::stringstream err; err << "ibv_create_cq failed (errno " << errno << ")"; @@ -89,7 +94,7 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN qpInitAttr.cap.max_recv_sge = 1; qpInitAttr.cap.max_inline_data = 0; - struct ibv_qp* _qp = ibv_create_qp(pd, &qpInitAttr); + struct ibv_qp* _qp = IBVerbs::ibv_create_qp(pd, &qpInitAttr); if (_qp == nullptr) { std::stringstream err; err << "ibv_create_qp failed (errno " << errno << ")"; @@ -97,7 +102,7 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN } struct ibv_port_attr portAttr; - if (ibv_query_port(ctx, port, &portAttr) != 0) { + if (IBVerbs::ibv_query_port_w(ctx, port, &portAttr) != 0) { std::stringstream err; err << "ibv_query_port failed (errno " << errno << ")"; throw mscclpp::IbError(err.str(), errno); @@ -111,7 +116,7 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND || this->info.is_grh) { union ibv_gid gid; - if (ibv_query_gid(ctx, port, 0, &gid) != 0) { + if (IBVerbs::ibv_query_gid(ctx, port, 0, &gid) != 0) { std::stringstream err; err << "ibv_query_gid failed (errno " << errno << ")"; throw mscclpp::IbError(err.str(), errno); @@ -126,21 +131,21 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN qpAttr.pkey_index = 0; qpAttr.port_num = port; qpAttr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC; - if (ibv_modify_qp(_qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { + if (IBVerbs::ibv_modify_qp(_qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { std::stringstream err; err << "ibv_modify_qp failed (errno " << errno << ")"; throw mscclpp::IbError(err.str(), errno); } this->qp = _qp; this->wrn = 0; - this->wrs = std::make_unique(maxWrPerSend); - this->sges = std::make_unique(maxWrPerSend); - this->wcs = std::make_unique(maxCqPollNum); + this->wrs = std::make_shared>(maxWrPerSend); + this->sges = std::make_shared>(maxWrPerSend); + this->wcs = std::make_shared>(maxCqPollNum); } IbQp::~IbQp() { - ibv_destroy_qp(this->qp); - ibv_destroy_cq(this->cq); + IBVerbs::ibv_destroy_qp(this->qp); + IBVerbs::ibv_destroy_cq(this->cq); } void IbQp::rtr(const IbQpInfo& info) { @@ -167,9 +172,9 @@ void IbQp::rtr(const IbQpInfo& info) { qp_attr.ah_attr.sl = 0; qp_attr.ah_attr.src_path_bits = 0; qp_attr.ah_attr.port_num = info.port; - int ret = ibv_modify_qp(this->qp, &qp_attr, - IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | - IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER); + int ret = IBVerbs::ibv_modify_qp(this->qp, &qp_attr, + IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER); if (ret != 0) { std::stringstream err; err << "ibv_modify_qp failed (errno " << errno << ")"; @@ -186,7 +191,7 @@ void IbQp::rts() { qp_attr.rnr_retry = 7; qp_attr.sq_psn = 0; qp_attr.max_rd_atomic = 1; - int ret = ibv_modify_qp( + int ret = IBVerbs::ibv_modify_qp( this->qp, &qp_attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC); if (ret != 0) { @@ -204,13 +209,13 @@ IbQp::WrInfo IbQp::getNewWrInfo() { } int wrn = this->wrn; - ibv_send_wr* wr_ = &this->wrs[wrn]; - ibv_sge* sge_ = &this->sges[wrn]; + ibv_send_wr* wr_ = &this->wrs->data()[wrn]; + ibv_sge* sge_ = &this->sges->data()[wrn]; wr_->sg_list = sge_; wr_->num_sge = 1; wr_->next = nullptr; if (wrn > 0) { - this->wrs[wrn - 1].next = wr_; + (*this->wrs)[wrn - 1].next = wr_; } this->wrn++; return IbQp::WrInfo{wr_, sge_}; @@ -265,7 +270,7 @@ void IbQp::postSend() { return; } struct ibv_send_wr* bad_wr; - int ret = ibv_post_send(this->qp, this->wrs.get(), &bad_wr); + int ret = IBVerbs::ibv_post_send(this->qp, this->wrs->data(), &bad_wr); if (ret != 0) { std::stringstream err; err << "ibv_post_send failed (errno " << errno << ")"; @@ -281,16 +286,14 @@ void IbQp::postSend() { } int IbQp::pollCq() { - int wcNum = ibv_poll_cq(this->cq, this->maxCqPollNum, this->wcs.get()); + int wcNum = IBVerbs::ibv_poll_cq(this->cq, this->maxCqPollNum, this->wcs->data()); if (wcNum > 0) { this->numSignaledPostedItems -= wcNum; } return wcNum; } -IbQpInfo& IbQp::getInfo() { return this->info; } - -const ibv_wc* IbQp::getWc(int idx) const { return &this->wcs[idx]; } +int IbQp::getWcStatus(int idx) const { return (*this->wcs)[idx].status; } int IbQp::getNumCqItems() const { return this->numSignaledPostedItems; } @@ -301,20 +304,20 @@ IbCtx::IbCtx(const std::string& devName) : devName(devName) { } #endif // !defined(__HIP_PLATFORM_AMD__) int num; - struct ibv_device** devices = ibv_get_device_list(&num); + struct ibv_device** devices = IBVerbs::ibv_get_device_list(&num); for (int i = 0; i < num; ++i) { if (std::string(devices[i]->name) == devName) { - this->ctx = ibv_open_device(devices[i]); + this->ctx = IBVerbs::ibv_open_device(devices[i]); break; } } - ibv_free_device_list(devices); + IBVerbs::ibv_free_device_list(devices); if (this->ctx == nullptr) { std::stringstream err; err << "ibv_open_device failed (errno " << errno << ", device name << " << devName << ")"; throw mscclpp::IbError(err.str(), errno); } - this->pd = ibv_alloc_pd(this->ctx); + this->pd = IBVerbs::ibv_alloc_pd(this->ctx); if (this->pd == nullptr) { std::stringstream err; err << "ibv_alloc_pd failed (errno " << errno << ")"; @@ -326,16 +329,16 @@ IbCtx::~IbCtx() { this->mrs.clear(); this->qps.clear(); if (this->pd != nullptr) { - ibv_dealloc_pd(this->pd); + IBVerbs::ibv_dealloc_pd(this->pd); } if (this->ctx != nullptr) { - ibv_close_device(this->ctx); + IBVerbs::ibv_close_device(this->ctx); } } bool IbCtx::isPortUsable(int port) const { struct ibv_port_attr portAttr; - if (ibv_query_port(this->ctx, port, &portAttr) != 0) { + if (IBVerbs::ibv_query_port_w(this->ctx, port, &portAttr) != 0) { std::stringstream err; err << "ibv_query_port failed (errno " << errno << ", port << " << port << ")"; throw mscclpp::IbError(err.str(), errno); @@ -346,7 +349,7 @@ bool IbCtx::isPortUsable(int port) const { int IbCtx::getAnyActivePort() const { struct ibv_device_attr devAttr; - if (ibv_query_device(this->ctx, &devAttr) != 0) { + if (IBVerbs::ibv_query_device(this->ctx, &devAttr) != 0) { std::stringstream err; err << "ibv_query_device failed (errno " << errno << ")"; throw mscclpp::IbError(err.str(), errno); @@ -378,11 +381,9 @@ const IbMr* IbCtx::registerMr(void* buff, std::size_t size) { return mrs.back().get(); } -const std::string& IbCtx::getDevName() const { return this->devName; } - MSCCLPP_API_CPP int getIBDeviceCount() { int num; - ibv_get_device_list(&num); + IBVerbs::ibv_get_device_list(&num); return num; } @@ -441,7 +442,7 @@ MSCCLPP_API_CPP std::string getIBDeviceName(Transport ibTransport) { } int num; - struct ibv_device** devices = ibv_get_device_list(&num); + struct ibv_device** devices = IBVerbs::ibv_get_device_list(&num); if (ibTransportIndex >= num) { std::stringstream ss; ss << "IB transport out of range: " << ibTransportIndex << " >= " << num; @@ -452,7 +453,7 @@ MSCCLPP_API_CPP std::string getIBDeviceName(Transport ibTransport) { MSCCLPP_API_CPP Transport getIBTransportByDeviceName(const std::string& ibDeviceName) { int num; - struct ibv_device** devices = ibv_get_device_list(&num); + struct ibv_device** devices = IBVerbs::ibv_get_device_list(&num); for (int i = 0; i < num; ++i) { if (ibDeviceName == devices[i]->name) { switch (i) { // TODO: get rid of this ugly switch @@ -480,4 +481,14 @@ MSCCLPP_API_CPP Transport getIBTransportByDeviceName(const std::string& ibDevice throw std::invalid_argument("IB device not found"); } +#else // !defined(USE_IBVERBS) + +MSCCLPP_API_CPP int getIBDeviceCount() { return 0; } + +MSCCLPP_API_CPP std::string getIBDeviceName(Transport) { return ""; } + +MSCCLPP_API_CPP Transport getIBTransportByDeviceName(const std::string&) { return Transport::Unknown; } + +#endif // !defined(USE_IBVERBS) + } // namespace mscclpp diff --git a/src/include/execution_common.hpp b/src/include/execution_common.hpp index 7879af787..c37a18eea 100644 --- a/src/include/execution_common.hpp +++ b/src/include/execution_common.hpp @@ -25,6 +25,7 @@ enum class ChannelType : uint8_t { PROXY, }; +// NOTE(chhwang): any modification here requires corresponding updates in `tools/npkit/npkit_trace_generator.py`. enum class OperationType : uint8_t { BARRIER, PUT, @@ -32,6 +33,7 @@ enum class OperationType : uint8_t { GET, COPY, COPY_PACKET, + TRANSFORM_TO_PACKET, SIGNAL, WAIT, FLUSH, diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index e781daa38..21ea80bfa 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -15,7 +15,7 @@ #include "execution_common.hpp" #if defined(MSCCLPP_DEVICE_COMPILE) -#include +#include "gpu_data_types.hpp" namespace { template @@ -60,6 +60,11 @@ MSCCLPP_DEVICE_INLINE int4 add_vectors<__half>(int4 a, int4 b) { return add_vectors_helper<__half2>(a, b); } +template <> +MSCCLPP_DEVICE_INLINE int4 add_vectors<__bfloat16>(int4 a, int4 b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template MSCCLPP_DEVICE_INLINE uint2 add_vectors_helper(uint2 a, uint2 b) { uint2 ret; @@ -78,6 +83,11 @@ MSCCLPP_DEVICE_INLINE __attribute__((unused)) uint2 add_vectors<__half>(uint2 a, return add_vectors_helper<__half2>(a, b); } +template <> +MSCCLPP_DEVICE_INLINE __attribute__((unused)) uint2 add_vectors<__bfloat16>(uint2 a, uint2 b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template MSCCLPP_DEVICE_INLINE int add_vectors_helper(int a, int b) { return bit_cast(add_elements(bit_cast(a), bit_cast(b))); @@ -93,6 +103,11 @@ MSCCLPP_DEVICE_INLINE __attribute__((unused)) int add_vectors<__half>(int a, int return add_vectors_helper<__half2>(a, b); } +template <> +MSCCLPP_DEVICE_INLINE __attribute__((unused)) int add_vectors<__bfloat16>(int a, int b) { + return add_vectors_helper<__bfloat162>(a, b); +} + template MSCCLPP_DEVICE_INLINE uint32_t add_vectors_helper(uint32_t a, uint32_t b) { return bit_cast(add_elements(bit_cast(a), bit_cast(b))); @@ -108,6 +123,11 @@ MSCCLPP_DEVICE_INLINE uint32_t add_vectors<__half>(uint32_t a, uint32_t b) { return add_vectors_helper<__half2>(a, b); } +template <> +MSCCLPP_DEVICE_INLINE uint32_t add_vectors<__bfloat16>(uint32_t a, uint32_t b) { + return add_vectors_helper<__bfloat162>(a, b); +} + } // namespace #endif // defined(MSCCLPP_DEVICE_COMPILE) @@ -177,10 +197,9 @@ MSCCLPP_DEVICE_INLINE void handlePut(DeviceHandle* smChannel, return; } if (chType == ChannelType::PROXY) { - for (int i = 0; i < count; i++) { - uint32_t dstOffset = dstOffsets[i]; - uint32_t srcOffset = srcOffsets[i]; - proxyChannels[dstChannelIndexes[i]].put(dstOffset, srcOffset, size); + int tid = threadIdx.x; + if (tid < count) { + proxyChannels[dstChannelIndexes[tid]].put(dstOffsets[tid], srcOffsets[tid], size); } } } @@ -234,12 +253,25 @@ MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* output, uint32_t outputOf template MSCCLPP_DEVICE_INLINE void handlePutPacket(size_t scratchSize, DeviceHandle* smChannels, - uint8_t* dstChannelIndexes, uint32_t* dstOffsets, uint32_t* srcOffsets, - int nDstChannels, uint32_t size, uint32_t flag) { + DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, + uint32_t* dstOffsets, uint32_t* srcOffsets, int nDstChannels, uint32_t size, + ChannelType chType, uint32_t flag) { const size_t scratchBaseOffset = flag & 0x1 ? 0 : scratchSize >> 1; - for (int index = 0; index < nDstChannels; ++index) { - smChannels[dstChannelIndexes[index]].putPackets(scratchBaseOffset + dstOffsets[index] * 2, - srcOffsets[index], size, threadIdx.x, blockDim.x, flag); + if (chType == ChannelType::SM) { + for (int index = 0; index < nDstChannels; ++index) { + smChannels[dstChannelIndexes[index]].putPackets( + scratchBaseOffset + dstOffsets[index] * 2, srcOffsets[index], size, threadIdx.x, blockDim.x, flag); + } + } + if (chType == ChannelType::PROXY) { + int tid = threadIdx.x; + if (tid >= nDstChannels) { + return; + } + // For proxy channel, we assume src and dst are in packet format + uint32_t dstOffset = (dstOffsets[tid] << 1) + scratchBaseOffset; + uint32_t srcOffset = (srcOffsets[tid] << 1) + scratchBaseOffset; + proxyChannels[dstChannelIndexes[tid]].put(dstOffset, srcOffset, size << 1); } } @@ -278,8 +310,8 @@ MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBy template MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize, uint32_t dstOffset, uint32_t srcOffset, size_t size, uint32_t flag) { - const size_t outputScratchBaseOffset = flag & 0x1 ? 0 : srcSize >> 1; - PacketType* srcPackets = (PacketType*)((char*)src + outputScratchBaseOffset + 2 * srcOffset); + const size_t inputScratchBaseOffset = flag & 0x1 ? 0 : srcSize >> 1; + PacketType* srcPackets = (PacketType*)((char*)src + inputScratchBaseOffset + 2 * srcOffset); PacketPayload* result = (PacketPayload*)((char*)dst + dstOffset); size_t nPackets = size * 2 / sizeof(PacketType); for (size_t idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { @@ -288,6 +320,14 @@ MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize } } +template +MSCCLPP_DEVICE_INLINE void handleTransformToPacket(void* dst, void* src, size_t dstSize, uint32_t dstOffset, + uint32_t srcOffset, size_t size, uint32_t flag) { + const size_t outputScratchBaseOffset = flag & 0x1 ? 0 : dstSize >> 1; + dstOffset = dstOffset * 2 + outputScratchBaseOffset; + mscclpp::putPackets(dst, dstOffset, src, srcOffset, size, threadIdx.x, blockDim.x, flag); +} + template MSCCLPP_DEVICE_INLINE void handleReduceSend(T* dst, uint32_t dstOffsetByBytes, T* src, uint32_t srcOffsetByBytes, T* input, uint32_t* inputOffsets, DeviceHandle* smChannels, @@ -413,12 +453,13 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu } else if (op.type == OperationType::READ_REDUCE_COPY) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); + handleReadReduceCopySend(dst, op.dstOffset, src, op.srcOffset, smChannels, op.outputChannelIndexes, op.inputChannelIndexes, op.outputOffsets, op.inputOffsets, op.nOutputs, op.nInputs, op.size, false); } else if (op.type == OperationType::PUT_PACKET) { - handlePutPacket(scratchSize, smChannels, op.outputChannelIndexes, op.outputOffsets, op.inputOffsets, - op.nOutputs, op.size, flag); + handlePutPacket(scratchSize, smChannels, proxyChannels, op.outputChannelIndexes, op.outputOffsets, + op.inputOffsets, op.nOutputs, op.size, op.channelType, flag); } else if (op.type == OperationType::REDUCE_SEND_PACKET) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); @@ -435,6 +476,10 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); handleCopyPacket(dst, src, scratchSize, op.dstOffset, op.srcOffset, op.size, flag); + } else if (op.type == OperationType::TRANSFORM_TO_PACKET) { + T* dst = getBuffer(input, output, scratch, op.dstBufferType); + T* src = getBuffer(input, output, scratch, op.srcBufferType); + handleTransformToPacket(dst, src, scratchSize, op.dstOffset, op.srcOffset, op.size, flag); } else if (op.type == OperationType::REDUCE_SEND) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); @@ -501,6 +546,16 @@ class ExecutionKernel { NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); #else ); +#endif + break; + case DataType::BFLOAT16: + executionKernel<__bfloat16, PacketType><<>>( + rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, scratchSize, plan, flag +#if defined(ENABLE_NPKIT) + , + NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); +#else + ); #endif break; } diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 5e008c2ca..86545a0ee 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -57,11 +57,13 @@ struct ExecutionPlan::Impl { std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; - void loadExecutionPlan(size_t inputSize); + void loadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset); + void lightLoadExecutionPlan(size_t inputSize, size_t contsSrcOffset, size_t constDstOffset); void setupChannels(const nlohmann::json& gpus); - void setupOperations(const nlohmann::json& gpus); + void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset); void reset(); + void operationsReset(); const std::string name; const std::string planPath; diff --git a/include/mscclpp/gpu_data_types.hpp b/src/include/gpu_data_types.hpp similarity index 70% rename from include/mscclpp/gpu_data_types.hpp rename to src/include/gpu_data_types.hpp index 224e56de6..8d2a6fc79 100644 --- a/include/mscclpp/gpu_data_types.hpp +++ b/src/include/gpu_data_types.hpp @@ -9,6 +9,10 @@ #include #include +using __bfloat16 = __hip_bfloat16; +using __bfloat162 = __hip_bfloat162; +#define __CUDA_BF16_TYPES_EXIST__ + #else #include @@ -19,6 +23,9 @@ #include #endif +using __bfloat16 = __nv_bfloat16; +using __bfloat162 = __nv_bfloat162; + #endif #endif // MSCCLPP_GPU_DATA_TYPES_HPP_ diff --git a/src/include/ib.hpp b/src/include/ib.hpp index 0ea661617..4d8529988 100644 --- a/src/include/ib.hpp +++ b/src/include/ib.hpp @@ -6,6 +6,7 @@ #include #include +#include #include // Forward declarations of IB structures @@ -27,11 +28,11 @@ struct IbMrInfo { class IbMr { public: - ~IbMr(); + virtual ~IbMr(); - IbMrInfo getInfo() const; - const void* getBuff() const; - uint32_t getLkey() const; + virtual IbMrInfo getInfo() const; + virtual const void* getBuff() const; + virtual uint32_t getLkey() const; private: IbMr(ibv_pd* pd, void* buff, std::size_t size); @@ -55,24 +56,33 @@ struct IbQpInfo { bool is_grh; }; +enum class WsStatus { + Success, +}; + class IbQp { public: - ~IbQp(); - - void rtr(const IbQpInfo& info); - void rts(); - void stageSend(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, - uint64_t dstOffset, bool signaled); - void stageAtomicAdd(const IbMr* mr, const IbMrInfo& info, uint64_t wrId, uint64_t dstOffset, uint64_t addVal, - bool signaled); - void stageSendWithImm(const IbMr* mr, const IbMrInfo& info, uint32_t size, uint64_t wrId, uint64_t srcOffset, - uint64_t dstOffset, bool signaled, unsigned int immData); - void postSend(); - int pollCq(); - - IbQpInfo& getInfo(); - const ibv_wc* getWc(int idx) const; - int getNumCqItems() const; + virtual ~IbQp(); + + virtual void rtr([[maybe_unused]] const IbQpInfo& info); + virtual void rts(); + virtual void stageSend([[maybe_unused]] const IbMr* mr, [[maybe_unused]] const IbMrInfo& info, + [[maybe_unused]] uint32_t size, [[maybe_unused]] uint64_t wrId, + [[maybe_unused]] uint64_t srcOffset, [[maybe_unused]] uint64_t dstOffset, + [[maybe_unused]] bool signaled); + virtual void stageAtomicAdd([[maybe_unused]] const IbMr* mr, [[maybe_unused]] const IbMrInfo& info, + [[maybe_unused]] uint64_t wrId, [[maybe_unused]] uint64_t dstOffset, + [[maybe_unused]] uint64_t addVal, [[maybe_unused]] bool signaled); + virtual void stageSendWithImm([[maybe_unused]] const IbMr* mr, [[maybe_unused]] const IbMrInfo& info, + [[maybe_unused]] uint32_t size, [[maybe_unused]] uint64_t wrId, + [[maybe_unused]] uint64_t srcOffset, [[maybe_unused]] uint64_t dstOffset, + [[maybe_unused]] bool signaled, [[maybe_unused]] unsigned int immData); + virtual void postSend(); + virtual int pollCq(); + + IbQpInfo& getInfo() { return this->info; } + virtual int getWcStatus([[maybe_unused]] int idx) const; + virtual int getNumCqItems() const; private: struct WrInfo { @@ -88,9 +98,9 @@ class IbQp { ibv_qp* qp; ibv_cq* cq; - std::unique_ptr wcs; - std::unique_ptr wrs; - std::unique_ptr sges; + std::shared_ptr> wcs; + std::shared_ptr> wrs; + std::shared_ptr> sges; int wrn; int numSignaledPostedItems; int numSignaledStagedItems; @@ -103,13 +113,24 @@ class IbQp { class IbCtx { public: +#if defined(USE_IBVERBS) IbCtx(const std::string& devName); ~IbCtx(); IbQp* createQp(int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr, int maxWrPerSend, int port = -1); const IbMr* registerMr(void* buff, std::size_t size); - - const std::string& getDevName() const; +#else + IbCtx([[maybe_unused]] const std::string& devName) {} + ~IbCtx() {} + + IbQp* createQp([[maybe_unused]] int maxCqSize, [[maybe_unused]] int maxCqPollNum, [[maybe_unused]] int maxSendWr, + [[maybe_unused]] int maxRecvWr, [[maybe_unused]] int maxWrPerSend, [[maybe_unused]] int port = -1) { + return nullptr; + } + const IbMr* registerMr([[maybe_unused]] void* buff, [[maybe_unused]] std::size_t size) { return nullptr; } +#endif + + const std::string& getDevName() const { return this->devName; }; private: bool isPortUsable(int port) const; diff --git a/src/include/ibverbs_wrapper.hpp b/src/include/ibverbs_wrapper.hpp new file mode 100644 index 000000000..e862cbea3 --- /dev/null +++ b/src/include/ibverbs_wrapper.hpp @@ -0,0 +1,269 @@ +#ifndef MSCCLPP_IBVERBS_WRAPPER_HPP_ +#define MSCCLPP_IBVERBS_WRAPPER_HPP_ + +#include +#include + +#include +#include + +namespace mscclpp { + +struct IBVerbs { + private: + // Static method to initialize the library + static void initialize() { + initialized = true; + handle = dlopen("libibverbs.so", RTLD_NOW); + if (!handle) { + throw mscclpp::IbError("Failed to load libibverbs: " + std::string(dlerror()), errno); + } + + // Load the necessary functions + ibv_get_device_list_lib = (ibv_get_device_list_t)dlsym(handle, "ibv_get_device_list"); + ibv_free_device_list_lib = (ibv_free_device_list_t)dlsym(handle, "ibv_free_device_list"); + ibv_alloc_pd_lib = (ibv_alloc_pd_t)dlsym(handle, "ibv_alloc_pd"); + ibv_dealloc_pd_lib = (ibv_dealloc_pd_t)dlsym(handle, "ibv_dealloc_pd"); + ibv_open_device_lib = (ibv_open_device_t)dlsym(handle, "ibv_open_device"); + ibv_close_device_lib = (ibv_close_device_t)dlsym(handle, "ibv_close_device"); + ibv_query_device_lib = (ibv_query_device_t)dlsym(handle, "ibv_query_device"); + ibv_create_cq_lib = (ibv_create_cq_t)dlsym(handle, "ibv_create_cq"); + ibv_create_qp_lib = (ibv_create_qp_t)dlsym(handle, "ibv_create_qp"); + ibv_destroy_cq_lib = (ibv_destroy_cq_t)dlsym(handle, "ibv_destroy_cq"); + ibv_reg_mr_lib = (ibv_reg_mr_t)dlsym(handle, "ibv_reg_mr"); + ibv_dereg_mr_lib = (ibv_dereg_mr_t)dlsym(handle, "ibv_dereg_mr"); + ibv_query_gid_lib = (ibv_query_gid_t)dlsym(handle, "ibv_query_gid"); + ibv_modify_qp_lib = (ibv_modify_qp_t)dlsym(handle, "ibv_modify_qp"); + ibv_destroy_qp_lib = (ibv_destroy_qp_t)dlsym(handle, "ibv_destroy_qp"); + ibv_query_port_lib = (ibv_query_port_t)dlsym(handle, "ibv_query_port"); + ibv_reg_mr_iova2_lib = (ibv_reg_mr_iova2_t)dlsym(handle, "ibv_reg_mr_iova2"); + + if (!ibv_get_device_list_lib || !ibv_free_device_list_lib || !ibv_alloc_pd_lib || !ibv_dealloc_pd_lib || + !ibv_open_device_lib || !ibv_close_device_lib || !ibv_query_device_lib || !ibv_create_cq_lib || + !ibv_create_qp_lib || !ibv_destroy_cq_lib || !ibv_reg_mr_lib || !ibv_dereg_mr_lib || !ibv_query_gid_lib || + !ibv_reg_mr_iova2_lib || !ibv_modify_qp_lib || !ibv_destroy_qp_lib || !ibv_query_port_lib) { + throw mscclpp::IbError("Failed to load one or more function in the ibibverbs library: " + std::string(dlerror()), + errno); + dlclose(handle); + } + } + + public: + // Static method to get the device list + static struct ibv_device** ibv_get_device_list(int* num_devices) { + if (!initialized) initialize(); + if (ibv_get_device_list_lib) { + return ibv_get_device_list_lib(num_devices); + } + return nullptr; + } + + // Static method to free the device list + static void ibv_free_device_list(struct ibv_device** list) { + if (!initialized) initialize(); + if (ibv_free_device_list_lib) { + ibv_free_device_list_lib(list); + } + } + + // Static method to allocate a protection domain + static struct ibv_pd* ibv_alloc_pd(struct ibv_context* context) { + if (!initialized) initialize(); + if (ibv_alloc_pd_lib) { + return ibv_alloc_pd_lib(context); + } + return nullptr; + } + + // Static method to deallocate a protection domain + static int ibv_dealloc_pd(struct ibv_pd* pd) { + if (!initialized) initialize(); + if (ibv_dealloc_pd_lib) { + return ibv_dealloc_pd_lib(pd); + } + return -1; + } + + // Static method to open a device + static struct ibv_context* ibv_open_device(struct ibv_device* device) { + if (!initialized) initialize(); + if (ibv_open_device_lib) { + return ibv_open_device_lib(device); + } + return nullptr; + } + + // Static method to close a device + static int ibv_close_device(struct ibv_context* context) { + if (!initialized) initialize(); + if (ibv_close_device_lib) { + return ibv_close_device_lib(context); + } + return -1; + } + + // Static method to query a device + static int ibv_query_device(struct ibv_context* context, struct ibv_device_attr* device_attr) { + if (!initialized) initialize(); + if (ibv_query_device_lib) { + return ibv_query_device_lib(context, device_attr); + } + return -1; + } + + // Static method to create a completion queue + static struct ibv_cq* ibv_create_cq(struct ibv_context* context, int cqe, void* cq_context, + struct ibv_comp_channel* channel, int comp_vector) { + if (!initialized) initialize(); + if (ibv_create_cq_lib) { + return ibv_create_cq_lib(context, cqe, cq_context, channel, comp_vector); + } + return nullptr; + } + + // Static method to create a queue pair + static struct ibv_qp* ibv_create_qp(struct ibv_pd* pd, struct ibv_qp_init_attr* qp_init_attr) { + if (!initialized) initialize(); + if (ibv_create_qp_lib) { + return ibv_create_qp_lib(pd, qp_init_attr); + } + return nullptr; + } + + // Static method to destroy a completion queue + static int ibv_destroy_cq(struct ibv_cq* cq) { + if (!initialized) initialize(); + if (ibv_destroy_cq_lib) { + return ibv_destroy_cq_lib(cq); + } + return -1; + } + + // Static method to register a memory region + static struct ibv_mr* ibv_reg_mr2(struct ibv_pd* pd, void* addr, size_t length, int access) { + if (!initialized) initialize(); + if (ibv_reg_mr_lib) { + return ibv_reg_mr_lib(pd, addr, length, access); + } + return nullptr; + } + + // Static method to deregister a memory region + static int ibv_dereg_mr(struct ibv_mr* mr) { + if (!initialized) initialize(); + if (ibv_dereg_mr_lib) { + return ibv_dereg_mr_lib(mr); + } + return -1; + } + + // Static method to query a GID + static int ibv_query_gid(struct ibv_context* context, uint8_t port_num, int index, union ibv_gid* gid) { + if (!initialized) initialize(); + if (ibv_query_gid_lib) { + return ibv_query_gid_lib(context, port_num, index, gid); + } + return -1; + } + + // Static method to modify a queue pair + static int ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int attr_mask) { + if (!initialized) initialize(); + if (ibv_modify_qp_lib) { + return ibv_modify_qp_lib(qp, attr, attr_mask); + } + return -1; + } + + // Static method to destroy a queue pair + static int ibv_destroy_qp(struct ibv_qp* qp) { + if (!initialized) initialize(); + if (ibv_destroy_qp_lib) { + return ibv_destroy_qp_lib(qp); + } + return -1; + } + + static inline int ibv_post_send(struct ibv_qp* qp, struct ibv_send_wr* wr, struct ibv_send_wr** bad_wr) { + if (!initialized) initialize(); + return qp->context->ops.post_send(qp, wr, bad_wr); + } + + static inline int ibv_poll_cq(struct ibv_cq* cq, int num_entries, struct ibv_wc* wc) { + if (!initialized) initialize(); + return cq->context->ops.poll_cq(cq, num_entries, wc); + } + + static int ibv_query_port_w(struct ibv_context* context, uint8_t port_num, struct ibv_port_attr* port_attr) { + if (!initialized) initialize(); + if (ibv_query_port_lib) { + return ibv_query_port_lib(context, port_num, port_attr); + } + return -1; + } + + static struct ibv_mr* ibv_reg_mr_iova2_w(struct ibv_pd* pd, void* addr, size_t length, uint64_t iova, + unsigned int access) { + if (!initialized) initialize(); + if (ibv_reg_mr_iova2_lib) { + return ibv_reg_mr_iova2_lib(pd, addr, length, iova, access); + } + return nullptr; + } + + // Static method to clean up + static void cleanup() { + if (handle) { + dlclose(handle); + handle = nullptr; + } + } + + private: + // Handle for the dynamic library + static inline void* handle = nullptr; + + // Function pointers + typedef struct ibv_device** (*ibv_get_device_list_t)(int*); + typedef void (*ibv_free_device_list_t)(struct ibv_device**); + typedef struct ibv_pd* (*ibv_alloc_pd_t)(struct ibv_context*); + typedef int (*ibv_dealloc_pd_t)(struct ibv_pd*); + typedef struct ibv_context* (*ibv_open_device_t)(struct ibv_device*); + typedef int (*ibv_close_device_t)(struct ibv_context*); + typedef int (*ibv_query_device_t)(struct ibv_context*, struct ibv_device_attr*); + typedef struct ibv_cq* (*ibv_create_cq_t)(struct ibv_context*, int, void*, struct ibv_comp_channel*, int); + typedef struct ibv_qp* (*ibv_create_qp_t)(struct ibv_pd*, struct ibv_qp_init_attr*); + typedef int (*ibv_destroy_cq_t)(struct ibv_cq*); + typedef int (*ibv_destroy_qp_t)(struct ibv_qp*); + typedef struct ibv_mr* (*ibv_reg_mr_t)(struct ibv_pd*, void*, size_t, int); + typedef int (*ibv_dereg_mr_t)(struct ibv_mr*); + typedef int (*ibv_query_gid_t)(struct ibv_context*, uint8_t, int, union ibv_gid*); + typedef int (*ibv_modify_qp_t)(struct ibv_qp*, struct ibv_qp_attr*, int); + typedef int (*ibv_query_port_t)(struct ibv_context*, uint8_t, struct ibv_port_attr*); + typedef struct ibv_mr* (*ibv_reg_mr_iova2_t)(struct ibv_pd* pd, void* addr, size_t length, uint64_t iova, + unsigned int access); + + static inline ibv_get_device_list_t ibv_get_device_list_lib; + static inline ibv_free_device_list_t ibv_free_device_list_lib = nullptr; + static inline ibv_alloc_pd_t ibv_alloc_pd_lib = nullptr; + static inline ibv_dealloc_pd_t ibv_dealloc_pd_lib = nullptr; + static inline ibv_open_device_t ibv_open_device_lib = nullptr; + static inline ibv_close_device_t ibv_close_device_lib = nullptr; + static inline ibv_query_device_t ibv_query_device_lib = nullptr; + static inline ibv_create_cq_t ibv_create_cq_lib = nullptr; + static inline ibv_create_qp_t ibv_create_qp_lib = nullptr; + static inline ibv_destroy_cq_t ibv_destroy_cq_lib = nullptr; + static inline ibv_reg_mr_t ibv_reg_mr_lib = nullptr; + static inline ibv_dereg_mr_t ibv_dereg_mr_lib = nullptr; + static inline ibv_query_gid_t ibv_query_gid_lib = nullptr; + static inline ibv_modify_qp_t ibv_modify_qp_lib = nullptr; + static inline ibv_destroy_qp_t ibv_destroy_qp_lib = nullptr; + static inline ibv_query_port_t ibv_query_port_lib = nullptr; + static inline ibv_reg_mr_iova2_t ibv_reg_mr_iova2_lib = nullptr; + + static inline bool initialized = false; +}; + +} // namespace mscclpp + +#endif // MSCCLPP_IBVERBS_WRAPPER_HPP_ \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ec4fbff4e..96a1ca544 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,7 +3,10 @@ find_package(MPI) -set(TEST_LIBS_COMMON mscclpp ${GPU_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES} Threads::Threads) +set(TEST_LIBS_COMMON mscclpp ${GPU_LIBRARIES} ${NUMA_LIBRARIES} Threads::Threads) +if(IBVERBS_FOUND) + list(APPEND TEST_LIBS_COMMON ${IBVERBS_LIBRARIES}) +endif() set(TEST_LIBS_GTEST GTest::gtest_main GTest::gmock_main) set(TEST_INC_COMMON PRIVATE ${PROJECT_SOURCE_DIR}/include SYSTEM PRIVATE ${GPU_INCLUDE_DIRS}) set(TEST_INC_INTERNAL PRIVATE ${PROJECT_SOURCE_DIR}/src/include) @@ -16,6 +19,9 @@ endif() function(add_test_executable name sources) add_executable(${name} ${sources}) target_link_libraries(${name} ${TEST_LIBS_COMMON} MPI::MPI_CXX) + if(IBVERBS_FOUND) + target_compile_definitions(${name} PRIVATE USE_IBVERBS) + endif() target_include_directories(${name} ${TEST_INC_COMMON} ${TEST_INC_INTERNAL}) target_compile_definitions(${name} PRIVATE MSCCLPP_USE_MPI_FOR_TESTS) add_test(NAME ${name} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/run_mpi_test.sh ${name} 2) diff --git a/test/execution-files/sendrecv_packet.json b/test/execution-files/sendrecv_packet.json new file mode 100644 index 000000000..3156b6191 --- /dev/null +++ b/test/execution-files/sendrecv_packet.json @@ -0,0 +1,180 @@ +{ + "name": "send_recv", + "colletive": "sendrecv", + "protocol": "LL", + "inplace": false, + "gpus": [ + { + "id": 0, + "inputChunks": 1, + "outputChunks": 1, + "scratchChunks": 2, + "chunkGroups": 1, + "threadblocks": [ + { + "id": 0, + "ops": [ + { + "name": "tpkt", + "src": 0, + "srcbuff": "i", + "srcoff": 0, + "dst": 0, + "dstbuff": "s", + "dstoff": 0, + "ctype": "none", + "cnt": 1 + }, + { + "name": "nop", + "deps": [ + { + "tb": 0, + "step": 0 + } + ] + }, + { + "name": "ppkt", + "o_buff": { + "src": "s", + "dst": "s" + }, + "o_cids": [ + { + "id": 0, + "off": 1 + } + ], + "srcs": [ + { + "buff": "s", + "off": 0 + } + ], + "ctype": "proxy", + "cnt": 1 + }, + { + "name": "cpkt", + "src": 0, + "srcbuff": "s", + "srcoff": 1, + "dst": 0, + "dstbuff": "o", + "dstoff": 0, + "ctype": "none", + "cnt": 1 + } + ], + "channels": [ + { + "src": "s", + "dst": "s", + "ctype": "proxy", + "cids": [ + 0 + ] + } + ] + } + ], + "channels": [ + { + "srcbuff": "s", + "dstbuff": "s", + "type": "proxy", + "connectedTo": [ + 1 + ] + } + ] + }, + { + "id": 1, + "inputChunks": 1, + "outputChunks": 1, + "scratchChunks": 2, + "chunkGroups": 1, + "threadblocks": [ + { + "id": 0, + "ops": [ + { + "name": "tpkt", + "src": 1, + "srcbuff": "i", + "srcoff": 0, + "dst": 1, + "dstbuff": "s", + "dstoff": 0, + "ctype": "none", + "cnt": 1 + }, + { + "name": "nop", + "deps": [ + { + "tb": 0, + "step": 0 + } + ] + }, + { + "name": "ppkt", + "o_buff": { + "src": "s", + "dst": "s" + }, + "o_cids": [ + { + "id": 0, + "off": 1 + } + ], + "srcs": [ + { + "buff": "s", + "off": 0 + } + ], + "ctype": "proxy", + "cnt": 1 + }, + { + "name": "cpkt", + "src": 1, + "srcbuff": "s", + "srcoff": 1, + "dst": 1, + "dstbuff": "o", + "dstoff": 0, + "ctype": "none", + "cnt": 1 + } + ], + "channels": [ + { + "src": "s", + "dst": "s", + "ctype": "proxy", + "cids": [ + 0 + ] + } + ] + } + ], + "channels": [ + { + "srcbuff": "s", + "dstbuff": "s", + "type": "proxy", + "connectedTo": [ + 0 + ] + } + ] + } + ] +} diff --git a/test/mp_unit/ib_tests.cu b/test/mp_unit/ib_tests.cu index e878154d7..92ee287e2 100644 --- a/test/mp_unit/ib_tests.cu +++ b/test/mp_unit/ib_tests.cu @@ -5,7 +5,6 @@ #include -#include "infiniband/verbs.h" #include "mp_unit_tests.hpp" void IbTestBase::SetUp() { @@ -98,8 +97,8 @@ TEST_F(IbPeerToPeerTest, SimpleSendRecv) { int wcNum = qp->pollCq(); ASSERT_GE(wcNum, 0); for (int i = 0; i < wcNum; ++i) { - const ibv_wc* wc = qp->getWc(i); - EXPECT_EQ(wc->status, IBV_WC_SUCCESS); + int status = qp->getWcStatus(i); + EXPECT_EQ(status, static_cast(mscclpp::WsStatus::Success)); waiting = false; break; } @@ -272,8 +271,8 @@ TEST_F(IbPeerToPeerTest, MemoryConsistency) { wcNum = qp->pollCq(); } ASSERT_EQ(wcNum, 1); - const ibv_wc* wc = qp->getWc(0); - ASSERT_EQ(wc->status, IBV_WC_SUCCESS); + int status = qp->getWcStatus(0); + ASSERT_EQ(status, static_cast(mscclpp::WsStatus::Success)); } // Get the result from the receiver @@ -319,8 +318,8 @@ TEST_F(IbPeerToPeerTest, SimpleAtomicAdd) { int wcNum = qp->pollCq(); ASSERT_GE(wcNum, 0); for (int i = 0; i < wcNum; ++i) { - const ibv_wc* wc = qp->getWc(i); - EXPECT_EQ(wc->status, IBV_WC_SUCCESS); + int status = qp->getWcStatus(i); + EXPECT_EQ(status, static_cast(mscclpp::WsStatus::Success)); waiting = false; break; } diff --git a/tools/npkit/npkit_trace_generator.py b/tools/npkit/npkit_trace_generator.py index 8c15a3ac0..283181a15 100644 --- a/tools/npkit/npkit_trace_generator.py +++ b/tools/npkit/npkit_trace_generator.py @@ -17,6 +17,7 @@ def parse_npkit_event_header(npkit_event_header_path): "GET", "COPY", "COPY_PACKET", + "TRANSFORM_TO_PACKET", "SIGNAL", "WAIT", "FLUSH",