Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Binyang2014 committed Oct 30, 2024
1 parent a125378 commit cd5e04a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
15 changes: 11 additions & 4 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize,
if (protocol == "LL") {
this->isUsingPacket = true;
}
this->inputSize = inputSize;
this->outputSize = outputSize;
this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024);
const auto& gpus = obj["gpus"];

Expand All @@ -200,9 +202,6 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize,
this->chunkGroups[rank] = gpu["chunkGroups"];
}
this->setupChannels(gpus);

this->inputSize = inputSize;
this->outputSize = outputSize;
this->setupOperations(gpus, contsSrcOffset, constDstOffset);
}

Expand Down Expand Up @@ -242,7 +241,7 @@ void ExecutionPlan::Impl::parseChannels(
NvlsInfo info;
info.bufferType = convertToBufferType(channel["buff"]);
for (const auto& group : channel["rankGroups"]) {
info.bufferSize = group["size"];
info.bufferSize = (int)group["size"] * this->getUpperBoundChunkSize(rank, this->inputSize, this->outputSize);
for (int rank : group["ranks"]) {
info.ranks.push_back(rank);
}
Expand Down Expand Up @@ -503,11 +502,19 @@ size_t ExecutionPlan::Impl::getNChunkSize(int rank, size_t inputSize, size_t out
return nChunkSize;
}

size_t ExecutionPlan::Impl::getUpperBoundChunkSize(int rank, size_t inputSize, size_t outputSize) const {
auto sizePerRank = calcSizePerRank(rank, inputSize, outputSize);
uint32_t nChunks = sizePerRank.second;
return (sizePerRank.first + nChunks - 1) / nChunks;
}

void ExecutionPlan::Impl::reset() {
this->operations.clear();
this->channelInfos.clear();
this->nvlsInfos.clear();
this->threadblockSMChannelMap.clear();
this->threadblockProxyChannelMap.clear();
this->threadblockNvlsChannelMap.clear();
this->inputChunks.clear();
this->outputChunks.clear();
this->scratchChunks.clear();
Expand Down
13 changes: 7 additions & 6 deletions src/include/execution_plan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ struct ExecutionPlan::Impl {
void setupChannels(const nlohmann::json& gpus);
void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset);

// helper functions to setup the channels
void parseChannels(
const nlohmann::json& gpu, std::vector<ChannelInfo>& channelInfos, std::vector<NvlsInfo>& nvlsInfos,
std::map<std::tuple<int, BufferType, BufferType, ChannelType>, std::vector<int>>& chanConnectedPeersMap,
int rank);

void reset();
void operationsReset();

Expand Down Expand Up @@ -136,6 +130,13 @@ struct ExecutionPlan::Impl {
size_t getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, uint32_t alignment = 16) const;
size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks,
const std::vector<uint32_t> offsets) const;
size_t getUpperBoundChunkSize(int rank, size_t inputSize, size_t outputSize) const;

// helper functions to setup the channels
void parseChannels(
const nlohmann::json& gpu, std::vector<ChannelInfo>& channelInfos, std::vector<NvlsInfo>& nvlsInfos,
std::map<std::tuple<int, BufferType, BufferType, ChannelType>, std::vector<int>>& chanConnectedPeersMap,
int rank);
};

} // namespace mscclpp
Expand Down
7 changes: 5 additions & 2 deletions src/nvls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ NvlsConnection::Impl::Impl(size_t bufferSize, int numDevices) {
throw mscclpp::SysError("getpid() failed", errno);
}

INFO(MSCCLPP_COLL, "NVLS handle created on root with size %ld. minGranularity %ld and recommendedGranularity %ld\n",
mcProp_.size, minMcGran_, mcGran_);
INFO(MSCCLPP_COLL,
"NVLS handle created on root with size %ld. minGranularity %ld and recommendedGranularity %ld buffer size is "
"%ld\n",
mcProp_.size, minMcGran_, mcGran_, bufferSize);
}

NvlsConnection::Impl::Impl(const std::vector<char>& data) {
Expand Down Expand Up @@ -215,6 +217,7 @@ std::shared_ptr<char> NvlsConnection::Impl::bindMemoryToMulticastHandle(size_t o
MSCCLPP_CUTHROW(cuMemAddressReserve((CUdeviceptr*)(&mcPtr), bufferSize, minMcGran_, 0U, 0));
MSCCLPP_CUTHROW(cuMemMap((CUdeviceptr)(mcPtr), bufferSize, 0, mcHandle_, 0));
MSCCLPP_CUTHROW(cuMemSetAccess((CUdeviceptr)(mcPtr), bufferSize, &accessDesc, 1));
INFO(MSCCLPP_COLL, "NVLS connection bound memory at offset %ld, size %ld", offset, bufferSize);

auto deleter = [=, self = shared_from_this()](char* ptr) {
CUdevice device;
Expand Down

0 comments on commit cd5e04a

Please sign in to comment.