Skip to content

Commit

Permalink
Fix bug for construct sempaphore (#341)
Browse files Browse the repository at this point in the history
Current semaphore construction requires two-way communication, e.g., to
construct a semaphore signaling from rank 0 to rank 1, both rank 0 and
rank 1 need to send a message to each other. This PR fixes an executor
bug that fails to conduct two-way communication for constructing such
one-way semaphores, and instead hangs during the semaphore construction.
In the future, we may need to change the implementation to construct
semaphore via one-way communication.

---------

Co-authored-by: Changho Hwang <[email protected]>
  • Loading branch information
Binyang2014 and chhwang authored Sep 4, 2024
1 parent 72b99a4 commit 26a8753
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 9 deletions.
46 changes: 46 additions & 0 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,45 @@ std::vector<ChannelInfo> ExecutionPlan::Impl::getChannelInfos(int rank, BufferTy
return filter(this->channelInfos.at(rank), pred);
}

std::vector<ChannelInfo> ExecutionPlan::Impl::getChannelInfosByDstRank(int rank, BufferType bufferType) const {
auto pred = [rank, bufferType](const ChannelInfo& info) { return info.dstBufferType == bufferType; };
return filter(this->channelInfosByDstRank.at(rank), pred);
}

std::vector<ChannelInfo> ExecutionPlan::Impl::getUnpairedChannelInfos(int rank, int worldSize,
ChannelType channelType) {
std::vector<ChannelInfo> unpaired;
for (int peer = 0; peer < worldSize; peer++) {
if (peer == rank) {
continue;
}
if (this->channelCountMap[{rank, channelType}][peer] < this->channelCountMap[{peer, channelType}][rank]) {
int count = this->channelCountMap[{peer, channelType}][rank] - this->channelCountMap[{rank, channelType}][peer];
for (int i = 0; i < count; i++) {
ChannelInfo info;
info.srcBufferType = BufferType::NONE;
info.dstBufferType = BufferType::NONE;
info.channelType = channelType;
info.connectedPeers.push_back(peer);
unpaired.push_back(info);
}
}
}
return unpaired;
}

std::vector<int> ExecutionPlan::Impl::getConnectedPeers(int rank) const {
std::set<int> peers;
for (const auto& info : this->channelInfos.at(rank)) {
for (int peer : info.connectedPeers) {
peers.insert(peer);
}
}
for (const auto& info : this->channelInfosByDstRank.at(rank)) {
for (int peer : info.connectedPeers) {
peers.insert(peer);
}
}
return std::vector<int>(peers.begin(), peers.end());
}

Expand Down Expand Up @@ -177,6 +209,8 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t contsS
// Construct the channel info. Step 1. Flatten SM and PROXY channels into separate vectors.
// Step 2. For each threadblock, construct a vector of channel indexes and keys.
void ExecutionPlan::Impl::setupChannels(const json& gpus) {
using mapKey = std::tuple<int, BufferType, BufferType, ChannelType>;
std::map<mapKey, std::vector<int>> chanConnectedPeersMap;
for (const auto& gpu : gpus) {
int rank = gpu["id"];
std::vector<ChannelInfo> channelInfos;
Expand All @@ -187,12 +221,24 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) {
info.channelType = convertToChannelType(channel["type"]);
for (const auto& peer : channel["connectedTo"]) {
info.connectedPeers.push_back(peer);
chanConnectedPeersMap[{peer, info.srcBufferType, info.dstBufferType, info.channelType}].push_back(rank);
this->channelCountMap[{rank, info.channelType}][peer]++;
}
channelInfos.push_back(info);
}
this->channelInfos[rank] = channelInfos;
}

for (const auto& [key, connectedFrom] : chanConnectedPeersMap) {
auto [peer, srcBufferType, dstBufferType, channelType] = key;
ChannelInfo info;
info.srcBufferType = srcBufferType;
info.dstBufferType = dstBufferType;
info.channelType = channelType;
info.connectedPeers = connectedFrom;
this->channelInfosByDstRank[peer].push_back(info);
}

// setup threadblockChannelMap
for (const auto& gpu : gpus) {
int rank = gpu["id"];
Expand Down
36 changes: 27 additions & 9 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ struct ExecutionContext {

struct Executor::Impl {
int nranksPerNode;
int nranks;
std::shared_ptr<Communicator> comm;
std::unordered_map<ExecutionContextKey, ExecutionContext> contexts;

Impl(std::shared_ptr<Communicator> comm) : comm(comm) { this->nranksPerNode = comm->bootstrap()->getNranksPerNode(); }
Impl(std::shared_ptr<Communicator> comm) : comm(comm) {
this->nranksPerNode = comm->bootstrap()->getNranksPerNode();
this->nranks = comm->bootstrap()->getNranks();
}
~Impl() = default;

ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t messageSize,
Expand Down Expand Up @@ -169,14 +173,18 @@ struct Executor::Impl {

std::vector<BufferType> bufferTypes = plan.impl_->getConnectedBufferTypes(rank);
for (BufferType bufferType : bufferTypes) {
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(rank, bufferType);
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfosByDstRank(rank, bufferType);
TransportFlags transportFlags = getTransportFlags(channelInfos, rank);
RegisteredMemory memory =
this->comm->registerMemory(getBufferInfo(bufferType).first, getBufferInfo(bufferType).second, transportFlags);
std::vector<int> connectedPeers = getConnectedPeers(channelInfos);
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteRegMemoryFutures;
for (int peer : connectedPeers) {
comm->sendMemoryOnSetup(memory, peer, 0);
}
channelInfos = plan.impl_->getChannelInfos(rank, bufferType);
connectedPeers = getConnectedPeers(channelInfos);
for (int peer : connectedPeers) {
remoteRegMemoryFutures.push_back(comm->recvMemoryOnSetup(peer, 0));
}
comm->setup();
Expand All @@ -201,19 +209,29 @@ struct Executor::Impl {
const auto channelTypes = {ChannelType::SM, ChannelType::PROXY};
std::vector<std::shared_ptr<SmDevice2DeviceSemaphore>> smSemaphores;
std::vector<mscclpp::SemaphoreId> proxySemaphores;
for (ChannelType channelType : channelTypes) {
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(rank, channelType);
auto processChannelInfos = [&](std::vector<ChannelInfo>& channelInfos) {
for (ChannelInfo& info : channelInfos) {
for (int peer : info.connectedPeers) {
if (channelType == ChannelType::SM) {
if (info.channelType == ChannelType::SM) {
smSemaphores.push_back(
std::make_shared<SmDevice2DeviceSemaphore>(*this->comm, context.connections.at(peer)));
} else if (channelType == ChannelType::PROXY) {
} else if (info.channelType == ChannelType::PROXY) {
proxySemaphores.push_back(
context.proxyService->buildAndAddSemaphore(*this->comm, context.connections.at(peer)));
}
}
}
};
for (ChannelType channelType : channelTypes) {
std::vector<ChannelInfo> channelInfos = plan.impl_->getChannelInfos(rank, channelType);
processChannelInfos(channelInfos);
// Current semaphore construction requires two-way communication, e.g., to construct a semaphore signaling from
// rank 0 to rank 1, both rank 0 and rank 1 need to send a message to each other. This PR fixes an executor bug
// that fails to conduct two-way communication for constructing such one-way semaphores, and instead hangs
// during the semaphore construction. In the future, we may need to change the implementation to construct
// semaphore via one-way communication.
channelInfos = plan.impl_->getUnpairedChannelInfos(rank, nranks, channelType);
processChannelInfos(channelInfos);
}
this->comm->setup();
context.smSemaphores = std::move(smSemaphores);
Expand Down Expand Up @@ -315,9 +333,9 @@ struct Executor::Impl {

Executor::Executor(std::shared_ptr<Communicator> comm) : impl_(std::make_unique<Impl>(comm)) {}

void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, size_t recvBuffSize,
DataType dataType, int nthreads, const ExecutionPlan& plan, cudaStream_t stream,
PacketType packetType) {
void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize,
[[maybe_unused]] size_t recvBuffSize, DataType dataType, int nthreads, const ExecutionPlan& plan,
cudaStream_t stream, PacketType packetType) {
size_t sendBytes, recvBytes;
CUdeviceptr sendBasePtr, recvBasePtr;
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff));
Expand Down
1 change: 1 addition & 0 deletions src/include/execution_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ constexpr int MAX_CHANNEL_PER_OPERATION = 8;
constexpr int MAX_OPERATION = 64;

enum class BufferType : uint8_t {
NONE,
INPUT,
OUTPUT,
SCRATCH,
Expand Down
14 changes: 14 additions & 0 deletions src/include/execution_plan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ struct hash<mscclpp::ChannelKey> {
std::hash<int>()(static_cast<int>(key.dstBufferType)) ^ std::hash<int>()(static_cast<int>(key.channelType));
}
};

template <>
struct hash<std::pair<int, mscclpp::ChannelType>> {
std::size_t operator()(const std::pair<int, mscclpp::ChannelType>& key) const {
std::size_t h1 = std::hash<int>()(key.first);
std::size_t h2 = std::hash<int>()(static_cast<int>(key.second));
// Refer hash_combine from boost
return h1 ^ (h2 + 0x9e3779b9 + (h1 << 6) + (h1 >> 2));
}
};
} // namespace std

namespace mscclpp {
Expand All @@ -51,6 +61,8 @@ struct ExecutionPlan::Impl {

std::vector<ChannelInfo> getChannelInfos(int rank, ChannelType channelType) const;
std::vector<ChannelInfo> getChannelInfos(int rank, BufferType bufferType) const;
std::vector<ChannelInfo> getChannelInfosByDstRank(int rank, BufferType bufferType) const;
std::vector<ChannelInfo> getUnpairedChannelInfos(int rank, int worldSize, ChannelType channelType);
std::vector<int> getConnectedPeers(int rank) const;
std::vector<BufferType> getConnectedBufferTypes(int rank) const;
size_t getScratchBufferSize(int rank, size_t inputSize) const;
Expand All @@ -71,6 +83,8 @@ struct ExecutionPlan::Impl {
// operations for [rank][threadblock] = [operations]
std::unordered_map<int, std::vector<std::vector<Operation>>> operations;
std::unordered_map<int, std::vector<ChannelInfo>> channelInfos;
std::unordered_map<int, std::vector<ChannelInfo>> channelInfosByDstRank;
std::unordered_map<std::pair<int, ChannelType>, std::unordered_map<int, int>> channelCountMap;
// threadblockChannelMap[rank][threadblock] = [channelIndex, channelKey]
std::unordered_map<int, std::vector<std::vector<std::pair<int, ChannelKey>>>> threadblockSMChannelMap;
std::unordered_map<int, std::vector<std::vector<std::pair<int, ChannelKey>>>> threadblockProxyChannelMap;
Expand Down

0 comments on commit 26a8753

Please sign in to comment.