Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NVLS support. #250

Merged
merged 73 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
472ec60
nvls barely works now
Jan 2, 2024
a7b627a
a bit cleaned up
Jan 2, 2024
4ff8a89
clean up
Jan 2, 2024
dfab9fe
nvls connection wip
Jan 3, 2024
643f124
wip
Jan 5, 2024
be04f9e
wip
Jan 5, 2024
2211a14
restoring registered memory changes
saeedmaleki Jan 5, 2024
f5acce8
wip
Jan 11, 2024
985d8c7
testing pid_getfd
Jan 15, 2024
5cc805b
removing ipc dep
Jan 15, 2024
abfba07
clean up
Jan 15, 2024
c062afe
possibly compilable?
Jan 15, 2024
a6e9af5
lint
Jan 17, 2024
d958a31
compiles
Jan 17, 2024
87a293f
wip
Jan 17, 2024
f1dfc0d
wip
Jan 17, 2024
76a6dd5
wip
Jan 18, 2024
1dc4e83
wip
Jan 19, 2024
5af9da5
new construct for nvls connection
Jan 19, 2024
847f1d8
lint
Jan 19, 2024
32c15b1
wip
Jan 19, 2024
01d752b
wip
Jan 19, 2024
a6f933c
Merge branch 'saemal/nvls' of https://github.com/microsoft/mscclpp in…
Jan 19, 2024
2631f99
wip
Jan 19, 2024
97db00c
wip
Jan 19, 2024
4957ee6
Merge branch 'saemal/nvls' of https://github.com/microsoft/mscclpp in…
Jan 19, 2024
caf997a
wip
Jan 19, 2024
9d5a262
compiles
Jan 19, 2024
8705127
wip
Jan 19, 2024
8e1a2dc
wip
Jan 19, 2024
4d8b214
fix
Binyang2014 Jan 19, 2024
6d31e62
wip
Jan 19, 2024
3011707
memalloc added
Jan 20, 2024
e449835
looks like it is working
Jan 20, 2024
7715776
it works
Jan 20, 2024
855b2ee
missing files
Jan 21, 2024
1120070
back to a working version
Jan 21, 2024
d2d5ec0
clean up - wip
Jan 21, 2024
08e077a
clean up -- wip
Jan 21, 2024
887790a
ok cleaned up
Jan 21, 2024
d570916
starting to look good
Jan 21, 2024
9d43d4d
wip
Jan 21, 2024
d3f4243
correctness check passes!
Jan 21, 2024
d30f557
debugging
Jan 21, 2024
572b30b
update
Binyang2014 Jan 21, 2024
8a710ab
update
Binyang2014 Jan 21, 2024
2aeae96
fix
Binyang2014 Jan 21, 2024
43d60ae
fix benchmark
Jan 22, 2024
a9e274b
a bit clean up
Jan 22, 2024
c1ebc3e
correctness works now
Jan 22, 2024
96303ce
all works for h100
Jan 22, 2024
feca28f
lint
Jan 22, 2024
21820a6
works
Binyang2014 Jan 22, 2024
20f3b0f
clean up
Binyang2014 Jan 23, 2024
2ec813e
lint
Binyang2014 Jan 23, 2024
5a30575
Merge branch 'main' into saemal/nvls
Binyang2014 Jan 23, 2024
f493d22
pass build
Binyang2014 Jan 23, 2024
9e5f0e6
fix benchmark
Binyang2014 Jan 23, 2024
2eb20af
pass test
Binyang2014 Jan 23, 2024
8e32bd2
fix
Binyang2014 Jan 23, 2024
5ecb01f
clean up
Binyang2014 Jan 23, 2024
292c240
fix
Binyang2014 Jan 23, 2024
a9f0280
HIP compatibility
chhwang Jan 23, 2024
3e6eab4
Merge branch 'main' into saemal/nvls
chhwang Jan 23, 2024
f1ec278
minor updates
chhwang Jan 23, 2024
032c00a
minor update
chhwang Jan 23, 2024
6b064e6
Merge branch 'main' into saemal/nvls
Binyang2014 Jan 24, 2024
fa0565f
add more tests
Binyang2014 Jan 24, 2024
28fd377
move multimem instruction to source code
Binyang2014 Jan 24, 2024
9bac9e8
restore file
Binyang2014 Jan 29, 2024
7291380
address comments
Binyang2014 Jan 29, 2024
2acddec
add comment
Binyang2014 Jan 31, 2024
bd28082
remove useless file
Binyang2014 Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include <bitset>
#include <future>
#include <memory>
#include <mscclpp/gpu.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <mscclpp/nvls_device.hpp>
#include <string>
#include <vector>

Expand Down Expand Up @@ -40,6 +43,7 @@ class Bootstrap {
virtual void allGather(void* allData, int size) = 0;
virtual void barrier() = 0;

void groupBarrier(const std::vector<int>& ranks);
void send(const std::vector<char>& data, int peer, int tag);
void recv(std::vector<char>& data, int peer, int tag);
};
Expand Down Expand Up @@ -125,6 +129,7 @@ class TcpBootstrap : public Bootstrap {
enum class Transport {
Unknown, // Unknown transport type.
CudaIpc, // CUDA IPC transport type.
Nvls, // NVLS transport type.
IB0, // InfiniBand device 0 transport type.
IB1, // InfiniBand device 1 transport type.
IB2, // InfiniBand device 2 transport type.
Expand All @@ -136,10 +141,11 @@ enum class Transport {
NumTransports // The number of transports.
};

const std::string TransportNames[] = {"UNK", "IPC", "IB0", "IB1", "IB2", "IB3", "IB4", "IB5", "IB6", "IB7", "NUM"};
const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2",
"IB3", "IB4", "IB5", "IB6", "IB7", "NUM"};

namespace detail {
const size_t TransportFlagsSize = 10;
const size_t TransportFlagsSize = 11;
static_assert(TransportFlagsSize == static_cast<size_t>(Transport::NumTransports),
"TransportFlagsSize must match the number of transports");
/// Bitset for storing transport flags.
Expand Down Expand Up @@ -445,26 +451,70 @@ class Connection {
static std::shared_ptr<Endpoint::Impl> getImpl(Endpoint& memory);
};

class NvlsConnection {
public:
NvlsConnection(size_t bufferSize, int numDevices);
NvlsConnection(const std::vector<char>& data);
NvlsConnection() = delete;
std::vector<char> serialize();

// Everyone needs to synchronize after creating a NVLS connection before adding devices
void addDevice();
void addDevice(int cudaDeviceId);

struct DeviceMulticastPointer {
private:
std::shared_ptr<PhysicalCudaMemory<char>> deviceMem_;
std::shared_ptr<char> mcPtr_;
size_t bufferSize_;

public:
using DeviceHandle = DeviceMulticastPointerDeviceHandle;
DeviceMulticastPointer(std::shared_ptr<PhysicalCudaMemory<char>> deviceMem, std::shared_ptr<char> mcPtr,
size_t bufferSize)
: deviceMem_(deviceMem), mcPtr_(mcPtr), bufferSize_(bufferSize) {}
DeviceHandle deviceHandle();
char* getDevicePtr();

friend class NvlsConnection;
};

std::shared_ptr<DeviceMulticastPointer> allocateAndBindCuda(size_t size);
size_t getMultiCastMinGranularity();

private:
class Impl;
std::shared_ptr<Impl> pimpl_;
};

/// Used to configure an endpoint.
struct EndpointConfig {
static const int DefaultMaxCqSize = 1024;
static const int DefaultMaxCqPollNum = 1;
static const int DefaultMaxSendWr = 8192;
static const int DefaultMaxWrPerSend = 64;
static const int DefaultNvlsBufferSize = (1 << 29);
chhwang marked this conversation as resolved.
Show resolved Hide resolved

Transport transport;
int ibMaxCqSize = DefaultMaxCqSize;
int ibMaxCqPollNum = DefaultMaxCqPollNum;
int ibMaxSendWr = DefaultMaxSendWr;
int ibMaxWrPerSend = DefaultMaxWrPerSend;

size_t nvlsBufferSize = DefaultNvlsBufferSize;

/// Default constructor. Sets transport to Transport::Unknown.
EndpointConfig() : transport(Transport::Unknown) {}

/// Constructor that takes a transport and sets the other fields to their default values.
///
/// @param transport The transport to use.
EndpointConfig(Transport transport) : transport(transport) {}

/// Constructor for NVLS explicitly
/// @param transport must be either NvlsRoot or NvlsNonRoot
/// @param nvlsBufferSize is the buffer to be alloced on each device
EndpointConfig(Transport transport, size_t nvlsBufferSize) : transport(transport), nvlsBufferSize(nvlsBufferSize) {}
};

/// Represents a context for communication. This provides a low-level interface for forming connections in use-cases
Expand Down Expand Up @@ -648,6 +698,16 @@ class Communicator {
/// to the connection.
NonblockingFuture<std::shared_ptr<Connection>> connectOnSetup(int remoteRank, int tag, EndpointConfig localConfig);

/// Connect to NVLS on setup.
///
/// This function used to connect to NVLS on setup. NVLS collective using multicast operations to send/recv data.
/// Here we need to put all involved ranks into the collective group.
///
/// @param allRanks The ranks of all processes involved in the collective.
/// @param config The configuration for the local endpoint.
/// @return std::shared_ptr<NvlsConnection> A shared pointer to the NVLS connection.
std::shared_ptr<NvlsConnection> connctNvlsCollective(std::vector<int> allRanks, EndpointConfig config);

/// Get the remote rank a connection is connected to.
///
/// @param connection The connection to get the remote rank for.
Expand Down
23 changes: 23 additions & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ using cudaIpcMemHandle_t = hipIpcMemHandle_t;

using CUresult = hipError_t;
using CUdeviceptr = hipDeviceptr_t;
using CUmemGenericAllocationHandle = hipMemGenericAllocationHandle_t;
using CUmemAllocationProp = hipMemAllocationProp;
using CUmemAccessDesc = hipMemAccessDesc;

constexpr auto cudaSuccess = hipSuccess;
constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking;
Expand All @@ -32,6 +35,11 @@ constexpr auto cudaMemcpyHostToDevice = hipMemcpyHostToDevice;
constexpr auto cudaMemcpyDeviceToHost = hipMemcpyDeviceToHost;
constexpr auto cudaIpcMemLazyEnablePeerAccess = hipIpcMemLazyEnablePeerAccess;

constexpr auto CU_MEM_ALLOCATION_TYPE_PINNED = hipMemAllocationTypePinned;
constexpr auto CU_MEM_LOCATION_TYPE_DEVICE = hipMemLocationTypeDevice;
constexpr auto CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR = hipMemHandleTypePosixFileDescriptor;
constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWrite;

#ifndef CUDA_SUCCESS
#define CUDA_SUCCESS hipSuccess
#endif // CUDA_SUCCESS
Expand Down Expand Up @@ -68,7 +76,14 @@ constexpr auto cudaIpcMemLazyEnablePeerAccess = hipIpcMemLazyEnablePeerAccess;
#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__)

#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__)
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)
#define cuMemAddressFree(...) hipMemAddressFree(__VA_ARGS__)
#define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__)
#define cuMemCreate(...) hipMemCreate(__VA_ARGS__)
#define cuMemRelease(...) hipMemRelease(__VA_ARGS__)
#define cuMemSetAccess(...) hipMemSetAccess(__VA_ARGS__)
#define cuMemMap(...) hipMemMap(__VA_ARGS__)
#define cuMemUnmap(...) hipMemUnmap(__VA_ARGS__)

#else

Expand All @@ -77,4 +92,12 @@ constexpr auto cudaIpcMemLazyEnablePeerAccess = hipIpcMemLazyEnablePeerAccess;

#endif

// NVLS
#if !defined(__HIP_PLATFORM_AMD__)
#include <linux/version.h>
#define USE_NVLS ((CUDART_VERSION >= 12010) && (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0)))
#else // !defined(__HIP_PLATFORM_AMD__)
#define USE_NVLS 0
#endif // !defined(__HIP_PLATFORM_AMD__)

#endif // MSCCLPP_GPU_HPP_
107 changes: 107 additions & 0 deletions include/mscclpp/gpu_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ struct CudaStreamWithFlags {
cudaStream_t stream_;
};

template <class T>
struct CudaDeleter;

template <class T>
struct PhysicalCudaMemory {
CUmemGenericAllocationHandle memHandle_;
T* devicePtr_;
size_t size_;
PhysicalCudaMemory(CUmemGenericAllocationHandle memHandle, T* devicePtr, size_t size)
: memHandle_(memHandle), devicePtr_(devicePtr), size_(size) {}
};

namespace detail {

/// A wrapper of cudaMalloc that sets the allocated memory to zero.
Expand All @@ -67,6 +79,47 @@ T* cudaCalloc(size_t nelem) {
return ptr;
}

template <class T>
PhysicalCudaMemory<T>* cudaPhysicalCalloc(size_t nelem, size_t gran) {
AvoidCudaGraphCaptureGuard cgcGuard;

int deviceId = -1;
MSCCLPP_CUDATHROW(cudaGetDevice(&deviceId));

CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = deviceId;
#if defined(__HIP_PLATFORM_AMD__)
// TODO: revisit when HIP fixes this typo in the field name
prop.requestedHandleType = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
#else
prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
#endif

CUmemGenericAllocationHandle memHandle;
size_t bufferSize = sizeof(T) * nelem;
// allocate physical memory
MSCCLPP_CUTHROW(cuMemCreate(&memHandle, bufferSize, &prop, 0 /*flags*/));

CUmemAccessDesc accessDesc = {};
accessDesc.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
accessDesc.location.id = deviceId;
accessDesc.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;

T* devicePtr = nullptr;
// Map the device pointer
MSCCLPP_CUTHROW(cuMemAddressReserve((CUdeviceptr*)&devicePtr, bufferSize, gran, 0U, 0));
MSCCLPP_CUTHROW(cuMemMap((CUdeviceptr)devicePtr, bufferSize, 0, memHandle, 0));
MSCCLPP_CUTHROW(cuMemSetAccess((CUdeviceptr)devicePtr, bufferSize, &accessDesc, 1));
CudaStreamWithFlags stream(cudaStreamNonBlocking);
MSCCLPP_CUDATHROW(cudaMemsetAsync(devicePtr, 0, bufferSize, stream));

MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

return new PhysicalCudaMemory<T>(memHandle, devicePtr, bufferSize);
}

template <class T>
T* cudaExtCalloc(size_t nelem) {
AvoidCudaGraphCaptureGuard cgcGuard;
Expand Down Expand Up @@ -118,6 +171,25 @@ Memory safeAlloc(size_t nelem) {
return Memory(ptr, Deleter());
}

template <class T, T*(alloc)(size_t, size_t), class Deleter, class Memory>
Memory safeAlloc(size_t nelem, size_t gran) {
if ((nelem * sizeof(T)) % gran) {
throw Error("The request allocation size is not divisible by the required granularity:" +
chhwang marked this conversation as resolved.
Show resolved Hide resolved
std::to_string(nelem * sizeof(T)) + " vs " + std::to_string(gran),
ErrorCode::InvalidUsage);
}
T* ptr = nullptr;
try {
ptr = alloc(nelem, gran);
} catch (...) {
if (ptr) {
Deleter()(ptr);
}
throw;
}
return Memory(ptr, Deleter());
}

} // namespace detail

/// A deleter that calls cudaFree for use with std::unique_ptr or std::shared_ptr.
Expand All @@ -131,6 +203,17 @@ struct CudaDeleter {
}
};

template <class T>
struct CudaPhysicalDeleter {
static_assert(!std::is_array_v<T>, "T must not be an array");
void operator()(PhysicalCudaMemory<T>* ptr) {
AvoidCudaGraphCaptureGuard cgcGuard;
MSCCLPP_CUTHROW(cuMemUnmap((CUdeviceptr)ptr->devicePtr_, ptr->size_));
MSCCLPP_CUTHROW(cuMemAddressFree((CUdeviceptr)ptr->devicePtr_, ptr->size_));
MSCCLPP_CUTHROW(cuMemRelease(ptr->memHandle_));
}
};

/// A deleter that calls cudaFreeHost for use with std::unique_ptr or std::shared_ptr.
/// @tparam T Type of each element in the allocated memory.
template <class T>
Expand All @@ -151,6 +234,18 @@ std::shared_ptr<T> allocSharedCuda(size_t count = 1) {
return detail::safeAlloc<T, detail::cudaCalloc<T>, CudaDeleter<T>, std::shared_ptr<T>>(count);
}

/// Allocated physical memory on the device and returns a memory handle along with a memory handle for it.
/// The deallocation only happens PhysicalCudaMemory goes out of scope.
/// @tparam T Type of each element in the allocated memory.
/// @param count Number of elements to allocate.
/// @param gran the granularity of the allocation.
/// @return A std::shared_ptr to the memory handle and a device pointer for that memory.
template <class T>
std::shared_ptr<PhysicalCudaMemory<T>> allocSharedPhysicalCuda(size_t count, size_t gran) {
return detail::safeAlloc<PhysicalCudaMemory<T>, detail::cudaPhysicalCalloc<T>, CudaPhysicalDeleter<T>,
std::shared_ptr<PhysicalCudaMemory<T>>>(count, gran);
}

/// Allocates memory on the device and returns a std::shared_ptr to it. The memory is zeroed out.
/// @tparam T Type of each element in the allocated memory.
/// @param count Number of elements to allocate.
Expand All @@ -174,6 +269,18 @@ UniqueCudaPtr<T> allocUniqueCuda(size_t count = 1) {
return detail::safeAlloc<T, detail::cudaCalloc<T>, CudaDeleter<T>, UniqueCudaPtr<T>>(count);
}

/// Allocated physical memory on the device and returns a memory handle along with a virtual memory handle for it.
/// The memory is zeroed out.
/// @tparam T Type of each element in the allocated memory.
/// @param count Number of elements to allocate.
/// @param gran the granularity of the allocation.
/// @return A std::unique_ptr to the memory handle and a device pointer for that memory.
template <class T>
std::unique_ptr<PhysicalCudaMemory<T>> allocUniquePhysicalCuda(size_t count, size_t gran) {
return detail::safeAlloc<PhysicalCudaMemory<T>, detail::cudaPhysicalCalloc<T>, CudaPhysicalDeleter<T>,
std::unique_ptr<CudaPhysicalDeleter<T>, CudaDeleter<CudaPhysicalDeleter<T>>>>(count, gran);
}

/// Allocates memory on the device and returns a std::unique_ptr to it. The memory is zeroed out.
/// @tparam T Type of each element in the allocated memory.
/// @param count Number of elements to allocate.
Expand Down
18 changes: 18 additions & 0 deletions include/mscclpp/nvls_device.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_NVLS_DEVICE_HPP_
#define MSCCLPP_NVLS_DEVICE_HPP_

namespace mscclpp {

/// Device-side handle for @ref Host2DeviceSemaphore.
struct DeviceMulticastPointerDeviceHandle {
void* devicePtr;
void* mcPtr;
size_t bufferSize;
};

} // namespace mscclpp

#endif // MSCCLPP_SEMAPHORE_DEVICE_HPP_
2 changes: 2 additions & 0 deletions include/mscclpp/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ struct ScopedTimer : public Timer {

std::string getHostName(int maxlen, const char delim);

bool isNvlsSupported();

} // namespace mscclpp

#endif // MSCCLPP_UTILS_HPP_
1 change: 1 addition & 0 deletions python/mscclpp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Transport,
TransportFlags,
version,
is_nvls_supported,
)

__version__ = version()
Expand Down
6 changes: 4 additions & 2 deletions python/mscclpp/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ def my_ib_device(self, local_rank: int) -> Transport:
assert False # only 8 IBs are supported

def make_connection(
self, remote_ranks: list[int], transports: Transport | dict[int, Transport]
self, all_ranks: list[int], transports: Transport | dict[int, Transport]
) -> dict[int, Connection]:
if transports == Transport.Nvls:
return self.communicator.connct_nvls_collective(all_ranks, transports)
connections = {}
for rank in remote_ranks:
for rank in all_ranks:
if type(transports) is dict:
transport = transports[rank]
else:
Expand Down
Loading