Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear minor warnings #214

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,6 @@ class NonblockingFuture {
/// @param future The shared future to move.
NonblockingFuture(std::shared_future<T>&& future) : future(std::move(future)) {}

/// Copy constructor.
///
/// @param other The @ref NonblockingFuture to copy.
NonblockingFuture(const NonblockingFuture& other) = default;

/// Check if the value is ready to be retrieved.
///
/// @return True if the value is ready, false otherwise.
Expand Down
2 changes: 1 addition & 1 deletion python/test/_cpp/proxy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MyProxyService {
semaphores_(semaphores),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
cudaGetDevice(&cudaDevice);
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);
}

Expand Down
46 changes: 16 additions & 30 deletions test/allgather_test_cpp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

static int nranksPerNode = 8;

// Propagate errors up

#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
return res; \
} \
} while (0)

// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
Expand All @@ -54,8 +42,7 @@ template <class T>
using DeviceHandle = mscclpp::DeviceHandle<T>;
__constant__ DeviceHandle<mscclpp::SimpleProxyChannel> constProxyChans[16];

__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int remoteRank, size_t nelemsPerGPU) {
__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, size_t nelemsPerGPU) {
// this allgather is really simple and implemented as an alltoall

// this thread's role is a sender role
Expand All @@ -70,8 +57,8 @@ __device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
if ((threadIdx.x % 32) == 0) proxyChan.wait();
}

__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, uint64_t offset, uint64_t size) {
// this allgather algorithm works as follows:
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
// and waits for data from GPU rank (i-1) % nranksPerNode
Expand All @@ -91,9 +78,9 @@ __device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyCh
}
}

__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
if (remoteRank / nranksPerNode == rank / nranksPerNode)
if ((threadIdx.x % 32) == 0) proxyChan.flush();
Expand All @@ -116,7 +103,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 1
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
}
// cross-node exchange
Expand All @@ -134,7 +121,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// local allgather
int otherNghr = (rank + nranksPerNode) % world_size;
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
}

Expand All @@ -152,7 +139,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 3
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank,
localAllGather(proxyChan, rank, nranksPerNode, remoteRank,
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
}
Expand All @@ -170,9 +157,9 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem
DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan = constProxyChans[warpId];

if (kernel == 0)
allgather0(proxyChan, rank, world_size, remoteRank, nelemsPerGPU);
allgather0(proxyChan, rank, nelemsPerGPU);
else if (kernel == 1)
allgather1(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
allgather1(proxyChan, rank, nranksPerNode, remoteRank, nelemsPerGPU);
else if (kernel == 2)
allgather2(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
}
Expand Down Expand Up @@ -388,7 +375,6 @@ int main(int argc, const char* argv[]) {
}
ip_port = (char*)parsedArgs["ip_port"].c_str();

int thisNode = rankToNode(rank);
int cudaNum = rankToLocalRank(rank);
CUDACHECK(cudaSetDevice(cudaNum));

Expand Down Expand Up @@ -452,19 +438,19 @@ int main(int argc, const char* argv[]) {
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
CUDACHECK(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
CUDACHECK(cudaStreamEndCapture(stream, &graph));
CUDACHECK(cudaGraphInstantiate(&instance, graph, NULL, NULL, 0));

int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand All @@ -477,7 +463,7 @@ int main(int argc, const char* argv[]) {
double t0, t1, ms, time_in_us;
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand Down
82 changes: 35 additions & 47 deletions test/allgather_test_host_offloading.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ int nranksPerNode;
int rank;
int world_size;

// Propagate errors up

// Check CUDA RT calls
#define CUCHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)

// Measure current time in second.
static double getTime(void) {
struct timespec tspec;
Expand All @@ -45,8 +33,8 @@ static double getTime(void) {
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}

__global__ void kernel(int r, int nranks, mscclpp::FifoDeviceHandle fifo,
mscclpp::Host2DeviceSemaphore::DeviceHandle* handles, int handleIndex) {
__global__ void kernel(int r, mscclpp::FifoDeviceHandle fifo, mscclpp::Host2DeviceSemaphore::DeviceHandle* handles,
int handleIndex) {
int tid = threadIdx.x;
__syncthreads();
// uint64_t tail;
Expand Down Expand Up @@ -75,8 +63,8 @@ void print_usage(const char* prog) {

void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
int** data_d) {
CUCHECK(cudaMalloc(data_d, dataSize));
CUCHECK(cudaMemset(*data_d, 0, dataSize));
MSCCLPP_CUDATHROW(cudaMalloc(data_d, dataSize));
MSCCLPP_CUDATHROW(cudaMemset(*data_d, 0, dataSize));

*data_h = new int[nelemsPerGPU * world_size];
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
Expand All @@ -87,29 +75,29 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz
(*data_h)[i] = 0;
}
}
CUCHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
}

class MyProxyService {
private:
int deviceNumaNode_;
mscclpp::Proxy proxy_;
int dataSize_;
std::vector<mscclpp::RegisteredMemory> remoteMemories_;
mscclpp::RegisteredMemory localMemory_;
std::vector<std::shared_ptr<mscclpp::Host2HostSemaphore>> hostSemaphores_;
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> deviceSemaphores1_;
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> deviceSemaphores2_;
std::vector<std::shared_ptr<mscclpp::Connection>> connections_;
int dataSize_;
mscclpp::Proxy proxy_;
int deviceNumaNode_;

public:
MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize)
: remoteMemories_(world_size),
: dataSize_(dataSize),
remoteMemories_(world_size),
connections_(world_size),
dataSize_(dataSize),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
CUCHECK(cudaGetDevice(&cudaDevice));
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);

int thisNode = rankToNode(rank);
Expand Down Expand Up @@ -237,7 +225,7 @@ int main(int argc, char* argv[]) {
MPI_Comm_free(&shmcomm);

int cudaNum = rankToLocalRank(rank);
CUCHECK(cudaSetDevice(cudaNum));
MSCCLPP_CUDATHROW(cudaSetDevice(cudaNum));

if (rank == 0) printf("Initializing MSCCL++\n");
auto bootstrap = std::make_shared<mscclpp::TcpBootstrap>(rank, world_size);
Expand Down Expand Up @@ -268,30 +256,30 @@ int main(int argc, char* argv[]) {
mscclpp::FifoDeviceHandle fifo = proxyService.fifo().deviceHandle();
if (rank == 0) printf("Testing the correctness of AllGather implementation\n");
cudaStream_t stream;
CUCHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
mscclpp::Host2DeviceSemaphore::DeviceHandle* deviceHandles1;
mscclpp::Host2DeviceSemaphore::DeviceHandle* deviceHandles2;

CUCHECK(cudaMalloc(&deviceHandles1, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
MSCCLPP_CUDATHROW(cudaMalloc(&deviceHandles1, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle1(i);
CUCHECK(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
}

CUCHECK(cudaMalloc(&deviceHandles2, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
MSCCLPP_CUDATHROW(cudaMalloc(&deviceHandles2, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle2(i);
CUCHECK(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
}

kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
CUCHECK(cudaStreamSynchronize(stream));
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

CUCHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));
MSCCLPP_CUDATHROW(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));

for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
int val = i + 1;
Expand All @@ -307,14 +295,14 @@ int main(int argc, char* argv[]) {
double t0, t1, ms, time_in_us;
int iterwithoutcudagraph = 10;
if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
bootstrap->barrier();
t0 = getTime();
for (int i = 0; i < iterwithoutcudagraph; ++i) {
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles2, 2);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles2, 2);
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
bootstrap->barrier();
t1 = getTime();
ms = (t1 - t0) * 1000.0;
Expand All @@ -327,22 +315,22 @@ int main(int argc, char* argv[]) {
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
MSCCLPP_CUDATHROW(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles2, 2);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles2, 2);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
MSCCLPP_CUDATHROW(cudaStreamEndCapture(stream, &graph));
MSCCLPP_CUDATHROW(cudaGraphInstantiate(&instance, graph, NULL, NULL, 0));

int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
MSCCLPP_CUDATHROW(cudaGraphLaunch(instance, stream));
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

// measure runtime
int cudagraphlaunch = 10;
Expand All @@ -352,9 +340,9 @@ int main(int argc, char* argv[]) {
bootstrap->barrier();
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
cudaGraphLaunch(instance, stream);
MSCCLPP_CUDATHROW(cudaGraphLaunch(instance, stream));
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

t1 = getTime();
ms = (t1 - t0) * 1000.0;
Expand Down
Loading