Skip to content

Commit

Permalink
Add cross threadblock barrier (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
Binyang2014 authored Nov 26, 2024
1 parent 1b8d020 commit 593478e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
2 changes: 1 addition & 1 deletion include/mscclpp/npkit/npkit_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x17
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x18

#endif
8 changes: 8 additions & 0 deletions src/executor/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ std::vector<T> filter(const std::vector<T>& vec, Predicate pred) {

auto getOpType = [](const std::string& str) {
if (str == "nop") {
return mscclpp::OperationType::NOP;
} else if (str == "barrier") {
return mscclpp::OperationType::BARRIER;
} else if (str == "put") {
return mscclpp::OperationType::PUT;
Expand Down Expand Up @@ -456,6 +458,12 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse
operation.size =
this->getNChunkSize(rank, this->inputSize, this->outputSize, (uint32_t)op["cnt"], chunkIndexes);
}
if (op.contains("barrier_id")) {
operation.deviceSyncerIndex = op["barrier_id"];
}
if (op.contains("nthread_blocks")) {
operation.nThreadBlocks = op["nthread_blocks"];
}
ops.push_back(operation);
}
this->operations[rank].push_back(ops);
Expand Down
20 changes: 15 additions & 5 deletions src/include/execution_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class ChannelType : uint8_t {

// NOTE(chhwang): any modification here requires corresponding updates in `tools/npkit/npkit_trace_generator.py`.
enum class OperationType : uint8_t {
NOP,
BARRIER,
PUT,
PUT_PACKET,
Expand Down Expand Up @@ -78,11 +79,20 @@ struct Operation {
BufferType outputBufferType;
uint8_t nvlsOutputIndex;
};
uint32_t inputOffsets[MAX_CHANNEL_PER_OPERATION];
uint32_t outputOffsets[MAX_CHANNEL_PER_OPERATION];
uint32_t srcOffset;
uint32_t dstOffset;
uint32_t size;
union {
// For Barrier operation
struct {
uint32_t deviceSyncerIndex;
uint32_t nThreadBlocks;
};
struct {
uint32_t inputOffsets[MAX_CHANNEL_PER_OPERATION];
uint32_t outputOffsets[MAX_CHANNEL_PER_OPERATION];
uint32_t srcOffset;
uint32_t dstOffset;
uint32_t size;
};
};
};

// total size = 2304 + 6400 + 4 + 12(padding) = 8720 bytes
Expand Down
10 changes: 9 additions & 1 deletion src/include/execution_kernel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#if defined(ENABLE_NPKIT)
#include <mscclpp/npkit/npkit.hpp>
#endif
#include <mscclpp/concurrency_device.hpp>
#include <mscclpp/packet_device.hpp>
#include <mscclpp/proxy_channel.hpp>
#include <mscclpp/sm_channel.hpp>
Expand Down Expand Up @@ -172,6 +173,9 @@ struct VectorType<float> {

namespace mscclpp {

#define MAX_DEVICE_SYNCERS 16
__device__ DeviceSyncer deviceSyncers[MAX_DEVICE_SYNCERS];

#if defined(MSCCLPP_DEVICE_COMPILE)

template <typename T>
Expand Down Expand Up @@ -526,8 +530,12 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu
event_buffer, &event_buffer_head);
#endif

if (op.type == OperationType::BARRIER) {
if (op.type == OperationType::NOP) {
__syncthreads();
} else if (op.type == OperationType::BARRIER) {
int nThreadBlocks = op.nThreadBlocks;
int syncStateIndex = op.deviceSyncerIndex;
deviceSyncers[syncStateIndex].sync(nThreadBlocks);
} else if (op.type == OperationType::SIGNAL) {
handleSignal(smChannels, proxyChannels, op.outputChannelIndexes, op.nOutputs, op.channelType);
} else if (op.type == OperationType::WAIT) {
Expand Down
11 changes: 6 additions & 5 deletions test/executor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ int main(int argc, char* argv[]) {
}

mscclpp::ExecutionPlan plan(executionPlanName, executionPlanPath);
#if (CUDA_NVLS_SUPPORTED)
std::shared_ptr<char> sendbuff = mscclpp::allocSharedPhysicalCuda<char>(bufferSize);
#else
std::shared_ptr<char> sendbuff = mscclpp::allocExtSharedCuda<char>(bufferSize);
#endif
std::shared_ptr<char> sendbuff;
if (mscclpp::isNvlsSupported()) {
sendbuff = mscclpp::allocSharedPhysicalCuda<char>(bufferSize);
} else {
sendbuff = mscclpp::allocExtSharedCuda<char>(bufferSize);
}
std::vector<int> dataHost(bufferSize / sizeof(int), rank);
MSCCLPP_CUDATHROW(cudaMemcpy(sendbuff.get(), dataHost.data(), bufferSize, cudaMemcpyHostToDevice));
double deltaSec = benchTime(rank, bootstrap, executor, plan, sendbuff, bufferSize, niters, ngraphIters, packetType);
Expand Down
1 change: 1 addition & 0 deletions tools/npkit/npkit_trace_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
def parse_npkit_event_header(npkit_event_header_path):
npkit_event_def = {"id_to_type": {}, "type_to_id": {}}
executor_ops = [
"NOP",
"BARRIER",
"PUT",
"PUT_PACKET",
Expand Down

0 comments on commit 593478e

Please sign in to comment.