diff --git a/.azure-pipelines/integration-test.yml b/.azure-pipelines/integration-test.yml index c8ca7765e..bdb299308 100644 --- a/.azure-pipelines/integration-test.yml +++ b/.azure-pipelines/integration-test.yml @@ -21,7 +21,7 @@ steps: tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda-12.1/compat/lib.real mkdir build && cd build - /tmp/cmake-3.26.4-linux-x86_64/bin/cmake .. + /tmp/cmake-3.26.4-linux-x86_64/bin/cmake -DCMAKE_BUILD_TYPE=Release .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml index 4332379d3..b8ac5721e 100644 --- a/.azure-pipelines/ut.yml +++ b/.azure-pipelines/ut.yml @@ -24,7 +24,7 @@ jobs: tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda-12.1/compat/lib.real mkdir build && cd build - /tmp/cmake-3.26.4-linux-x86_64/bin/cmake .. + /tmp/cmake-3.26.4-linux-x86_64/bin/cmake -DCMAKE_BUILD_TYPE=Release .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 99fd2b543..9841234b9 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -46,7 +46,7 @@ jobs: git config --global --add safe.directory /__w/mscclpp/mscclpp curl -L https://github.com/Kitware/CMake/releases/download/v3.26.4/cmake-3.26.4-linux-x86_64.tar.gz -o /tmp/cmake-3.26.4-linux-x86_64.tar.gz tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp - MPI_HOME=/usr/local/mpi /tmp/cmake-3.26.4-linux-x86_64/bin/cmake . + MPI_HOME=/usr/local/mpi /tmp/cmake-3.26.4-linux-x86_64/bin/cmake -DCMAKE_BUILD_TYPE=Release . make -j - name: Perform CodeQL Analysis diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 2ae182d5e..4cf18f376 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -19,14 +19,14 @@ jobs: uses: actions/checkout@v3 - name: Install ClangFormat - run: sudo apt-get install -y clang-format-12 + run: sudo apt-get install -y clang-format - name: Run cpplint run: | CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*") PYTHONCPPSOURCES=$(find ./python/src/ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)') - clang-format-12 -style=file --verbose --Werror --dry-run ${CPPSOURCES} - clang-format-12 --dry-run ${PYTHONCPPSOURCES} + clang-format -style=file --verbose --Werror --dry-run ${CPPSOURCES} + clang-format --dry-run ${PYTHONCPPSOURCES} spelling: runs-on: ubuntu-20.04 diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c2ff4485..30c0e681b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.26) project(mscclpp LANGUAGES CUDA CXX) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CUDA_STANDARD 17) -set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -gencode arch=compute_80,code=sm_80 -gencode arch=compute_90,code=sm_90") option(ENABLE_TRACE "Enable tracing" OFF) option(USE_MPI_FOR_TESTS "Use MPI for tests" ON) @@ -16,6 +15,22 @@ endif() list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) find_package(CUDAToolkit REQUIRED) + +# Set CUDA flags based on the detected CUDA version +if(CUDAToolkit_FOUND) + if(CUDAToolkit_VERSION_MAJOR LESS 11) + message(FATAL_ERROR "CUDA 11 or higher is required but detected ${CUDAToolkit_VERSION}") + endif() + + if(CUDAToolkit_VERSION_MAJOR GREATER_EQUAL 11) + set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -gencode arch=compute_80,code=sm_80") + endif() + + if(CUDAToolkit_VERSION_MAJOR GREATER_EQUAL 12) + set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -gencode arch=compute_90,code=sm_90") + endif() +endif() + find_package(IBVerbs REQUIRED) find_package(NUMA REQUIRED) if(USE_MPI_FOR_TESTS) diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 677b46cfd..000000000 --- a/TODO.md +++ /dev/null @@ -1,8 +0,0 @@ -# Core API extraction - -- Add a test for host side Communicator/RegisteredMemory/Connection use. -- Implement a standalone "epoch" synchronization construct that can be used as a component in custom proxies. epoch.hpp/cc has the beginnings of this. -- Reimplement the "standard" proxy service + DeviceConnection on top of the new Communicator/RegisteredMemory/Connection core API. Remants of the old code is in channel.hpp, basic_proxy_handler.hpp/cc and host_connection.hpp/cc. Probably need a manager class to wrap all of this. -- Change the new IBConnection and Communicator to use the new C++ IbCtx and IbQp classes. -- Implement IbQp::~IbQp() -- Fix RegisteredMemory::Impl::Impl to get the IPC handle from the base pointer, not the derived pointer. \ No newline at end of file diff --git a/include/mscclpp/channel.hpp b/include/mscclpp/channel.hpp index a012bf063..8d10c5e7f 100644 --- a/include/mscclpp/channel.hpp +++ b/include/mscclpp/channel.hpp @@ -157,31 +157,19 @@ struct DeviceChannel { DeviceProxyFifo fifo_; }; -class DeviceChannelService; - -inline ProxyHandler makeChannelProxyHandler(DeviceChannelService& channelService); - class DeviceChannelService { public: DeviceChannelService(Communicator& communicator); - ChannelId addChannel(std::shared_ptr connection) { - channels_.push_back(Channel(communicator_, connection)); - return channels_.size() - 1; - } + ChannelId addChannel(std::shared_ptr connection); - MemoryId addMemory(RegisteredMemory memory) { - memories_.push_back(memory); - return memories_.size() - 1; - } + MemoryId addMemory(RegisteredMemory memory); - Channel channel(ChannelId id) { return channels_[id]; } - DeviceChannel deviceChannel(ChannelId id) { - return DeviceChannel(id, channels_[id].epoch().deviceHandle(), proxy_.fifo().deviceFifo()); - } + Channel channel(ChannelId id) const; + DeviceChannel deviceChannel(ChannelId id); - void startProxy() { proxy_.start(); } - void stopProxy() { proxy_.stop(); } + void startProxy(); + void stopProxy(); private: Communicator& communicator_; @@ -192,29 +180,7 @@ class DeviceChannelService { void bindThread(); - ProxyHandlerResult handleTrigger(ProxyTrigger triggerRaw) { - ChannelTrigger* trigger = reinterpret_cast(&triggerRaw); - Channel& channel = channels_[trigger->fields.chanId]; - - auto result = ProxyHandlerResult::Continue; - - if (trigger->fields.type & TriggerData) { - RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; - RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; - channel.connection().write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, trigger->fields.size); - } - - if (trigger->fields.type & TriggerFlag) { - channel.epoch().signal(); - } - - if (trigger->fields.type & TriggerSync) { - channel.connection().flush(); - result = ProxyHandlerResult::FlushFifoTailAndContinue; - } - - return result; - } + ProxyHandlerResult handleTrigger(ProxyTrigger triggerRaw); }; struct SimpleDeviceChannel { diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index b6249bfd8..c7373c21f 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -31,18 +31,8 @@ class BaseBootstrap { virtual void allGather(void* allData, int size) = 0; virtual void barrier() = 0; - // TODO: move implementations of these helpers out of this header - void send(const std::vector& data, int peer, int tag) { - size_t size = data.size(); - send((void*)&size, sizeof(size_t), peer, tag); - send((void*)data.data(), data.size(), peer, tag + 1); - } - void recv(std::vector& data, int peer, int tag) { - size_t size; - recv((void*)&size, sizeof(size_t), peer, tag); - data.resize(size); - recv((void*)data.data(), data.size(), peer, tag + 1); - } + void send(const std::vector& data, int peer, int tag); + void recv(std::vector& data, int peer, int tag); }; class Bootstrap : public BaseBootstrap { diff --git a/include/mscclpp/cuda_utils.hpp b/include/mscclpp/cuda_utils.hpp index 3e232247d..ffe319cfd 100644 --- a/include/mscclpp/cuda_utils.hpp +++ b/include/mscclpp/cuda_utils.hpp @@ -162,4 +162,4 @@ void memcpyCuda(T* dst, const T* src, size_t count, cudaMemcpyKind kind = cudaMe } // namespace mscclpp -#endif // MSCCLPP_CUDA_UTILS_HPP_ \ No newline at end of file +#endif // MSCCLPP_CUDA_UTILS_HPP_ diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index 2f764762d..9092689a4 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -45,6 +45,19 @@ struct ExtInfo { mscclppSocketAddress extAddressListen; }; +MSCCLPP_API_CPP void BaseBootstrap::send(const std::vector& data, int peer, int tag) { + size_t size = data.size(); + send((void*)&size, sizeof(size_t), peer, tag); + send((void*)data.data(), data.size(), peer, tag + 1); +} + +MSCCLPP_API_CPP void BaseBootstrap::recv(std::vector& data, int peer, int tag) { + size_t size; + recv((void*)&size, sizeof(size_t), peer, tag); + data.resize(size); + recv((void*)data.data(), data.size(), peer, tag + 1); +} + struct UniqueIdInternal { uint64_t magic; union mscclppSocketAddress addr; diff --git a/src/channel.cc b/src/channel.cc index d2ba3c34f..3534c40e8 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -16,6 +16,26 @@ MSCCLPP_API_CPP DeviceChannelService::DeviceChannelService(Communicator& communi deviceNumaNode = getDeviceNumaNode(cudaDevice); } +MSCCLPP_API_CPP ChannelId DeviceChannelService::addChannel(std::shared_ptr connection) { + channels_.push_back(Channel(communicator_, connection)); + return channels_.size() - 1; +} + +MSCCLPP_API_CPP MemoryId DeviceChannelService::addMemory(RegisteredMemory memory) { + memories_.push_back(memory); + return memories_.size() - 1; +} + +MSCCLPP_API_CPP Channel DeviceChannelService::channel(ChannelId id) const { return channels_[id]; } + +MSCCLPP_API_CPP DeviceChannel DeviceChannelService::deviceChannel(ChannelId id) { + return DeviceChannel(id, channels_[id].epoch().deviceHandle(), proxy_.fifo().deviceFifo()); +} + +MSCCLPP_API_CPP void DeviceChannelService::startProxy() { proxy_.start(); } + +MSCCLPP_API_CPP void DeviceChannelService::stopProxy() { proxy_.stop(); } + MSCCLPP_API_CPP void DeviceChannelService::bindThread() { if (deviceNumaNode >= 0) { numaBind(deviceNumaNode); @@ -23,5 +43,29 @@ MSCCLPP_API_CPP void DeviceChannelService::bindThread() { } } +ProxyHandlerResult DeviceChannelService::handleTrigger(ProxyTrigger triggerRaw) { + ChannelTrigger* trigger = reinterpret_cast(&triggerRaw); + Channel& channel = channels_[trigger->fields.chanId]; + + auto result = ProxyHandlerResult::Continue; + + if (trigger->fields.type & TriggerData) { + RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; + RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; + channel.connection().write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, trigger->fields.size); + } + + if (trigger->fields.type & TriggerFlag) { + channel.epoch().signal(); + } + + if (trigger->fields.type & TriggerSync) { + channel.connection().flush(); + result = ProxyHandlerResult::FlushFifoTailAndContinue; + } + + return result; +} + } // namespace channel } // namespace mscclpp diff --git a/src/include/basic_proxy_handler.hpp b/src/include/basic_proxy_handler.hpp deleted file mode 100644 index 2d22a309b..000000000 --- a/src/include/basic_proxy_handler.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef MSCCLPP_BASIC_PROXY_SERVICE_HPP_ -#define MSCCLPP_BASIC_PROXY_SERVICE_HPP_ - -#include - -#include "communicator.hpp" - -namespace mscclpp { - -ProxyHandler makeBasicProxyHandler(Communicator::Impl& comm); - -} - -#endif \ No newline at end of file diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 4c2061102..b370a8814 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -39,14 +39,7 @@ struct RegisteredMemory::Impl { Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(const std::vector& data); - TransportInfo& getTransportInfo(Transport transport) { - for (auto& entry : transportInfos) { - if (entry.transport == transport) { - return entry; - } - } - throw Error("Transport data not found", ErrorCode::InternalError); - } + const TransportInfo& getTransportInfo(Transport transport) const; }; } // namespace mscclpp diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 578af286d..2b5713929 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -141,4 +141,13 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { } } +const TransportInfo& RegisteredMemory::Impl::getTransportInfo(Transport transport) const { + for (auto& entry : transportInfos) { + if (entry.transport == transport) { + return entry; + } + } + throw Error("Transport data not found", ErrorCode::InternalError); +} + } // namespace mscclpp diff --git a/test/mp_unit_tests.cu b/test/mp_unit_tests.cu index 377625a87..9d1eafbf7 100644 --- a/test/mp_unit_tests.cu +++ b/test/mp_unit_tests.cu @@ -671,12 +671,8 @@ class ChannelOneToOneTest : public CommunicatorTestBase { mscclpp::channel::ChannelId cid = channelService->addChannel(connections[r]); communicator->setup(); - // TODO: enable this when we support out-of-place - // devChannels.emplace_back(channelService->deviceChannel(cid), - // channelService->addMemory(remoteMemory), channelService->addMemory(sendMemory), - // remoteMemory.data(), sendMemory.data(), tmpBuff); devChannels.emplace_back(channelService->deviceChannel(cid), channelService->addMemory(remoteMemory), - channelService->addMemory(sendMemory), remoteMemory.data(), sendMemory.data()); + channelService->addMemory(sendMemory), remoteMemory.data(), sendMemory.data(), tmpBuff); } } diff --git a/test/mscclpp-test/common.cu b/test/mscclpp-test/common.cu index 0c37acd2e..1893937ca 100644 --- a/test/mscclpp-test/common.cu +++ b/test/mscclpp-test/common.cu @@ -72,11 +72,8 @@ double allreduceTime(int worldSize, double value, int average) { double accumulator = value; if (average != 0) { - MPI_Op op = average == 1 ? MPI_SUM - : average == 2 ? MPI_MIN - : average == 3 ? MPI_MAX - : average == 4 ? MPI_SUM - : MPI_Op(); + MPI_Op op = + average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op(); MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD); }