Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yzygitzh committed Oct 23, 2024
1 parent 59d0917 commit ab77c07
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 47 deletions.
48 changes: 30 additions & 18 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ std::vector<BufferType> ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c
}
return std::vector<BufferType>(bufferTypes.begin(), bufferTypes.end());
}
size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const {

void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag) {
size_t sizePerRank;
if (this->inputChunks.at(rank) != 0)
sizePerRank = inputSize / this->inputChunks.at(rank);
Expand All @@ -157,15 +158,18 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz
else
throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError);

size_t scratchBufferSize = sizePerRank * this->scratchChunks.at(rank);
this->scratchBufferSize = sizePerRank * this->scratchChunks.at(rank);
if (this->isUsingPacket) {
scratchBufferSize *= 2; /* data + flag */
this->scratchBufferSize *= 2; /* data + flag */
}
if (this->isUsingDoubleScratchBuffer) {
scratchBufferSize *= 2; /* double buffer */
this->scratchBufferSize *= 2; /* double buffer */
}
return scratchBufferSize;
this->scratchBufferOffset = (this->isUsingDoubleScratchBuffer && (flag % 2) == 0) ? (this->scratchBufferSize / 2) : 0;
}

size_t ExecutionPlan::Impl::getScratchBufferSize() const { return this->scratchBufferSize; }

std::vector<Operation> ExecutionPlan::Impl::getOperations(int rank, int threadblock) const {
return this->operations.at(rank)[threadblock];
}
Expand All @@ -174,10 +178,9 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper

int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; }

bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->isUsingDoubleScratchBuffer; }

void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset,
size_t constDstOffset) {
void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset,
size_t constDstOffset, int selfRank, size_t inputBufferSize,
size_t outputBufferSize, int flag) {
std::ifstream file(this->planPath);
json obj = json::parse(file);
if (this->name != obj["name"]) {
Expand All @@ -202,11 +205,13 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize,

this->inputSize = inputSize;
this->outputSize = outputSize;
this->setupOperations(gpus, contsSrcOffset, constDstOffset);
this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag);
this->setupOperations(gpus, constSrcOffset, constDstOffset);
}

void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset,
size_t constDstOffset) {
void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset,
size_t constDstOffset, int selfRank, size_t inputBufferSize,
size_t outputBufferSize, int flag) {
std::ifstream file(this->planPath);
json obj = json::parse(file);
if (this->name != obj["name"]) {
Expand All @@ -229,7 +234,8 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t output

this->inputSize = inputSize;
this->outputSize = outputSize;
this->setupOperations(gpus, contsSrcOffset, constDstOffset);
this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag);
this->setupOperations(gpus, constSrcOffset, constDstOffset);
}

// Construct the channel info. Step 1. Flatten SM and PROXY channels into separate vectors.
Expand Down Expand Up @@ -299,7 +305,7 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) {
}
}

void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffset, size_t constDstOffset) {
void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffset, size_t constDstOffset) {
// setup threadblocks and operations
for (const auto& gpu : gpus) {
int rank = gpu["id"];
Expand Down Expand Up @@ -334,7 +340,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]];
operation.inputOffsets[i] =
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["i_cids"][i]["off"]) +
(srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0);
(srcBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset);
chunkIndexes.push_back((uint32_t)op["i_cids"][i]["off"]);
}
}
Expand All @@ -345,7 +351,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
for (int i = 0; i < operation.nInputs; i++) {
operation.inputOffsets[i] =
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcs"][i]["off"]) +
(operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0);
(operation.inputBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset);
chunkIndexes.push_back((uint32_t)op["srcs"][i]["off"]);
}
}
Expand All @@ -358,7 +364,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]];
operation.outputOffsets[i] =
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["o_cids"][i]["off"]) +
(dstBufferType != BufferType::SCRATCH ? constDstOffset : 0);
(dstBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset);
chunkIndexes.push_back((uint32_t)op["o_cids"][i]["off"]);
}
}
Expand All @@ -369,7 +375,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
for (int i = 0; i < operation.nOutputs; i++) {
operation.outputOffsets[i] =
this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dsts"][i]["off"]) +
(operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0);
(operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset);
chunkIndexes.push_back((uint32_t)op["dsts"][i]["off"]);
}
}
Expand All @@ -378,13 +384,19 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse
}
if (op.contains("srcoff")) {
operation.srcOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcoff"]);
if (operation.srcBufferType == BufferType::SCRATCH) {
operation.srcOffset += this->scratchBufferOffset;
}
chunkIndexes.push_back((uint32_t)op["srcoff"]);
}
if (op.contains("dstbuff")) {
operation.dstBufferType = convertToBufferType(op["dstbuff"]);
}
if (op.contains("dstoff")) {
operation.dstOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dstoff"]);
if (operation.dstBufferType == BufferType::SCRATCH) {
operation.dstOffset += this->scratchBufferOffset;
}
chunkIndexes.push_back((uint32_t)op["dstoff"]);
}
if (op.contains("cnt")) {
Expand Down
41 changes: 18 additions & 23 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ struct ExecutionContext {
size_t scratchBufferSize;
std::shared_ptr<char> deviceExecutionPlansBuffer;
int nthreadsPerBlock;
bool isUsingDoubleScratchBuffer;
};

struct Executor::Impl {
Expand All @@ -83,11 +82,13 @@ struct Executor::Impl {

ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize,
size_t outputMessageSize, size_t contsSrcOffset, size_t constDstOffset,
size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) {
size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan,
int flag) {
ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name};
if (this->contexts.find(key) != this->contexts.end()) {
plan.impl_->operationsReset();
plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset);
plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank,
sendBufferSize, recvBufferSize, flag);
this->setupDeviceExecutionPlan(this->contexts[key], rank, plan);
this->contexts[key].deviceExecutionPlansBuffer =
allocExtSharedCuda<char>(this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan));
Expand All @@ -98,16 +99,16 @@ struct Executor::Impl {
}

plan.impl_->reset();
plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset);
plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank,
sendBufferSize, recvBufferSize, flag);

ExecutionContext context;
size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize);
size_t scratchBufferSize = plan.impl_->getScratchBufferSize();
std::shared_ptr<char> scratchBuffer = allocExtSharedCuda<char>(scratchBufferSize);
context.scratchBuffer = scratchBuffer;
context.scratchBufferSize = scratchBufferSize;
context.proxyService = std::make_shared<ProxyService>();
context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock();
context.isUsingDoubleScratchBuffer = plan.impl_->getIsUsingDoubleScratchBuffer();
this->setupConnections(context, rank, plan);
this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan);
Expand Down Expand Up @@ -305,13 +306,8 @@ struct Executor::Impl {
}

void launchKernel(ExecutionContext& context, int rank, void* sendbuff, void* recvbuff, DataType dataType,
cudaStream_t stream, PacketType packetType) {
static uint32_t flag = 0;
cudaStream_t stream, PacketType packetType, uint32_t flag) {
int nthreadblocks = context.deviceExecutionPlans.size();
char* kernelScratchBufferPtr = context.scratchBuffer.get();
if (context.isUsingDoubleScratchBuffer && (flag % 2)) {
kernelScratchBufferPtr += context.scratchBufferSize / 2;
}
#if defined(ENABLE_NPKIT)
#if defined(__HIP_PLATFORM_AMD__)
if (nthreadblocks > NPKIT_MAX_NUM_GPU_THREADBLOCKS) {
Expand All @@ -327,16 +323,14 @@ struct Executor::Impl {
#endif
switch (packetType) {
case PacketType::LL16:
ExecutionKernel::launchKernel<LL16Packet>(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff,
(void*)kernelScratchBufferPtr, dataType,
(DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(),
sharedMemSize, stream, ++flag);
ExecutionKernel::launchKernel<LL16Packet>(
rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(),
dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag);
break;
case PacketType::LL8:
ExecutionKernel::launchKernel<LL8Packet>(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff,
(void*)kernelScratchBufferPtr, dataType,
(DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(),
sharedMemSize, stream, ++flag);
ExecutionKernel::launchKernel<LL8Packet>(
rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(),
dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag);
break;
default:
throw Error("Invalid packet type", ErrorCode::ExecutorError);
Expand All @@ -349,17 +343,18 @@ Executor::Executor(std::shared_ptr<Communicator> comm) : impl_(std::make_unique<
void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize,
[[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan,
cudaStream_t stream, PacketType packetType) {
static uint32_t flag = 1;
size_t sendBytes, recvBytes;
CUdeviceptr sendBasePtr, recvBasePtr;
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff));
MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff));
size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr;
size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr;

ExecutionContext context =
this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize,
offsetIn, offsetOut, sendBytes, recvBytes, plan);
this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType);
offsetIn, offsetOut, sendBytes, recvBytes, plan, flag);
this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType, flag);
flag++;
}

Executor::~Executor() = default;
Expand Down
4 changes: 2 additions & 2 deletions src/include/execution_kernel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ class ExecutionKernel {
template <typename PacketType>
static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch,
DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream,
uint32_t flag = 0) {
uint32_t flag) {
switch (dataType) {
case DataType::INT32:
executionKernel<int32_t, PacketType><<<nthreadblocks, nthreads, sharedMemSize, stream>>>(
Expand Down Expand Up @@ -603,7 +603,7 @@ class ExecutionKernel {
template <typename PacketType>
static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch,
DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream,
uint32_t flag = 0);
uint32_t flag);
#endif // !defined(MSCCLPP_DEVICE_HIP)
};
} // namespace mscclpp
Expand Down
12 changes: 8 additions & 4 deletions src/include/execution_plan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ struct ExecutionPlan::Impl {
std::vector<ChannelInfo> getUnpairedChannelInfos(int rank, int worldSize, ChannelType channelType);
std::vector<int> getConnectedPeers(int rank) const;
std::vector<BufferType> getConnectedBufferTypes(int rank) const;
size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const;
size_t getScratchBufferSize() const;
std::vector<Operation> getOperations(int rank, int threadblock) const;
int getThreadblockCount(int rank) const;
int getNThreadsPerBlock() const;
bool getIsUsingDoubleScratchBuffer() const;

void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset);
void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset);
void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset, int rank,
size_t inputBufferSize, size_t outputBufferSize, int flag);
void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset,
int rank, size_t inputBufferSize, size_t outputBufferSize, int flag);
void setupChannels(const nlohmann::json& gpus);
void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset);

Expand All @@ -98,12 +99,15 @@ struct ExecutionPlan::Impl {
size_t outputSize;
int nThreadsPerBlock;
bool isUsingDoubleScratchBuffer;
size_t scratchBufferSize;
size_t scratchBufferOffset;

private:
std::pair<size_t, u_int32_t> calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const;
size_t getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, uint32_t alignment = 16) const;
size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks,
const std::vector<uint32_t> offsets) const;
void calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag);
};

} // namespace mscclpp
Expand Down

0 comments on commit ab77c07

Please sign in to comment.