Skip to content

Commit

Permalink
Reduce memory usage for scratch buffer (#403)
Browse files Browse the repository at this point in the history
In the executor, we allocate the scratch buffer based on `sendMemRange`.
However, for certain execution plans, this allocation may be unsuitable,
as the plan does not support messages of this size.

To avoid allocating to much data and cause OOM error, set scratch buffer
size to `min(scratchBufferSize(maxMessageSizeSupportedForPlan),
scratchBufferSize(sendMemRange))`
  • Loading branch information
Binyang2014 authored Dec 13, 2024
1 parent 01fd813 commit ee75caf
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
18 changes: 18 additions & 0 deletions .azure-pipelines/nccl-api-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ jobs:
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_reduce_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"'
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: RunNcclGatherTest
displayName: Run NCCL Allreduce Test
inputs:
targetType: 'inline'
script: |
set -e
HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci
ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \
-O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\
cd /root/mscclpp; \
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_gather_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"'
workingDirectory: '$(System.DefaultWorkingDirectory)'


- task: AzureCLI@2
name: StopVMSS
displayName: Deallocate VMSS
Expand Down
20 changes: 19 additions & 1 deletion src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ std::vector<BufferType> ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c
}
return std::vector<BufferType>(bufferTypes.begin(), bufferTypes.end());
}

size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const {
size_t sizePerRank;
size_t sizePerRank = 0;
if (this->inputChunks.at(rank) != 0)
sizePerRank = inputSize / this->inputChunks.at(rank);
else if (this->outputChunks.at(rank) != 0)
Expand All @@ -179,6 +180,23 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz
}
return sizePerRank * this->scratchChunks.at(rank);
}

size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const {
if (this->maxMessageSize == std::numeric_limits<uint64_t>::max()) {
return std::numeric_limits<size_t>::max();
}
size_t sizePerChunk = 0;
if (this->inputChunks.at(rank) != 0)
sizePerChunk = maxMessageSize / this->inputChunks.at(rank);
else if (this->outputChunks.at(rank) != 0)
sizePerChunk = maxMessageSize / this->outputChunks.at(rank);
else
throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError);

return this->getScratchBufferSize(rank, sizePerChunk * this->inputChunks.at(rank),
sizePerChunk * this->outputChunks.at(rank));
}

std::vector<Operation> ExecutionPlan::Impl::getOperations(int rank, int threadblock) const {
return this->operations.at(rank)[threadblock];
}
Expand Down
20 changes: 11 additions & 9 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ struct Executor::Impl {

ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize,
size_t outputMessageSize, size_t constSrcOffset, size_t constDstOffset,
size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) {
ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name};
size_t sendMemRange, size_t recvMemRange, const ExecutionPlan& plan) {
ExecutionContextKey key = {sendbuff, recvbuff, sendMemRange, recvMemRange, plan.impl_->name};
DeviceExecutionPlanKey devicePlanKey = {inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset};
if (this->contexts.find(key) != this->contexts.end()) {
auto& devicePlans = this->contexts[key].deviceExecutionPlans;
Expand All @@ -167,7 +167,9 @@ struct Executor::Impl {
plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset);

ExecutionContext context;
size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize);
size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank);
size_t scratchBufferSize =
std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize);
std::shared_ptr<char> scratchBuffer;
if (isNvlsSupported()) {
scratchBuffer = allocSharedPhysicalCuda<char>(scratchBufferSize);
Expand All @@ -179,8 +181,8 @@ struct Executor::Impl {
context.proxyService = std::make_shared<ProxyService>();
context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock();
this->setupConnections(context, rank, plan);
this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
this->setupRegisteredMemories(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan);
this->setupChannels(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan);
this->setupNvlsChannels(context, sendbuff, recvbuff, rank, plan);
this->setupDeviceExecutionPlan(context, devicePlanKey, rank, plan);
context.deviceExecutionPlansBuffers[devicePlanKey] =
Expand Down Expand Up @@ -438,16 +440,16 @@ Executor::Executor(std::shared_ptr<Communicator> comm) : impl_(std::make_unique<
void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize,
[[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan,
cudaStream_t stream, PacketType packetType) {
size_t sendBytes, recvBytes;
size_t sendMemRange, recvMemRange;
CUdeviceptr sendBasePtr, recvBasePtr;
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff));
MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff));
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendMemRange, (CUdeviceptr)sendbuff));
MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvMemRange, (CUdeviceptr)recvbuff));
size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr;
size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr;

ExecutionContext context =
this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize,
offsetIn, offsetOut, sendBytes, recvBytes, plan);
offsetIn, offsetOut, sendMemRange, recvMemRange, plan);
this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType);
}

Expand Down
1 change: 1 addition & 0 deletions src/include/execution_plan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct ExecutionPlan::Impl {
std::vector<int> getConnectedPeers(int rank) const;
std::vector<BufferType> getConnectedBufferTypes(int rank) const;
size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const;
size_t getMaxScratchBufferSize(int rank) const;
std::vector<Operation> getOperations(int rank, int threadblock) const;
int getThreadblockCount(int rank) const;
int getNThreadsPerBlock() const;
Expand Down

0 comments on commit ee75caf

Please sign in to comment.