Skip to content

Commit

Permalink
fixing format
Browse files Browse the repository at this point in the history
  • Loading branch information
caiomcbr committed Apr 15, 2024
1 parent 9448acd commit 4966faa
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 67 deletions.
28 changes: 14 additions & 14 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,23 @@ class TcpBootstrap : public Bootstrap {

/// Enumerates the available transport types.
enum class Transport {
Unknown, // Unknown transport type.
CudaIpc, // CUDA IPC transport type.
Nvls, // NVLS transport type.
IB0, // InfiniBand device 0 transport type.
IB1, // InfiniBand device 1 transport type.
IB2, // InfiniBand device 2 transport type.
IB3, // InfiniBand device 3 transport type.
IB4, // InfiniBand device 4 transport type.
IB5, // InfiniBand device 5 transport type.
IB6, // InfiniBand device 6 transport type.
IB7, // InfiniBand device 7 transport type.
Ethernet, // Ethernet transport type.
Unknown, // Unknown transport type.
CudaIpc, // CUDA IPC transport type.
Nvls, // NVLS transport type.
IB0, // InfiniBand device 0 transport type.
IB1, // InfiniBand device 1 transport type.
IB2, // InfiniBand device 2 transport type.
IB3, // InfiniBand device 3 transport type.
IB4, // InfiniBand device 4 transport type.
IB5, // InfiniBand device 5 transport type.
IB6, // InfiniBand device 6 transport type.
IB7, // InfiniBand device 7 transport type.
Ethernet, // Ethernet transport type.
NumTransports, // The number of transports.
};

const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2",
"IB3", "IB4", "IB5", "IB6", "IB7", "ETH", "NUM"};
const std::string TransportNames[] = {"UNK", "IPC", "NVLS", "IB0", "IB1", "IB2", "IB3",
"IB4", "IB5", "IB6", "IB7", "ETH", "NUM"};

namespace detail {
const size_t TransportFlagsSize = 12;
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ void Socket::recv(void* ptr, int size) {
socketWait(MSCCLPP_SOCKET_RECV, ptr, size, &offset);
}

void Socket::recvUntilEnd(void *ptr, int size, int* closed){
void Socket::recvUntilEnd(void* ptr, int size, int* closed) {
int offset = 0;
*closed = 0;
if (state_ != SocketStateReady) {
Expand Down
44 changes: 23 additions & 21 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ void IBConnection::flush(int64_t timeoutUsec) {

// EthernetConnection

EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint) : stopRcvMessages_(false), abortFlag_(0) {
EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEndpoint)
: stopRcvMessages_(false), abortFlag_(0) {
// Validating Transport Protocol
if (localEndpoint.transport() != Transport::Ethernet || remoteEndpoint.transport() != Transport::Ethernet) {
throw mscclpp::Error("Ethernet connection can only be made from Ethernet endpoints", ErrorCode::InvalidUsage);
Expand All @@ -201,7 +202,8 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn
});

// Starting Connection
sendSocket_ = std::make_unique<Socket>(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_);
sendSocket_ =
std::make_unique<Socket>(&(getImpl(remoteEndpoint)->socketAddress_), 0xdeadbeef, SocketTypeBootstrap, abortFlag_);
sendSocket_->connect();

// Ensure the Connection was Established
Expand All @@ -213,7 +215,7 @@ EthernetConnection::EthernetConnection(Endpoint localEndpoint, Endpoint remoteEn
INFO(MSCCLPP_NET, "Ethernet connection created");
}

EthernetConnection::~EthernetConnection(){
EthernetConnection::~EthernetConnection() {
sendSocket_->close();
stopRcvMessages_ = true;
rcvSocket_->close();
Expand All @@ -231,18 +233,19 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe
validateTransport(src, transport());

// Initializing Variables
char* srcPtr = reinterpret_cast<char*>(src.data()) + srcOffset/sizeof(char);
char* dstPtr = reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset/sizeof(char);
char* srcPtr = reinterpret_cast<char*>(src.data()) + srcOffset / sizeof(char);
char* dstPtr = reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset / sizeof(char);
uint64_t sendSize = 0;

// Sending Info Data
sendSocket_->send(&dstPtr, sizeof(char*));
sendSocket_->send(&size, sizeof(uint64_t));

// Getting Data From GPU and Sending Data
while(sendSize < size){
uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize)/sizeof(char)) * sizeof(char);
mscclpp::memcpyCuda<char>(sendBuffer_, (char*)srcPtr + (sendSize/sizeof(char)), messageSize, cudaMemcpyDeviceToHost);
while (sendSize < size) {
uint64_t messageSize = std::min(sendBufferSize_, (size - sendSize) / sizeof(char)) * sizeof(char);
mscclpp::memcpyCuda<char>(sendBuffer_, (char*)srcPtr + (sendSize / sizeof(char)), messageSize,
cudaMemcpyDeviceToHost);
sendSocket_->send(sendBuffer_, messageSize);
sendSize += messageSize;
}
Expand All @@ -264,17 +267,14 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
sendSocket_->send(&dstPtr, sizeof(char*));
sendSocket_->send(&size, sizeof(uint64_t));
sendSocket_->send(src, size);

INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);
}

void EthernetConnection::flush(int64_t timeoutUsec) {

INFO(MSCCLPP_NET, "EthernetConnection flushing connection");
}
void EthernetConnection::flush(int64_t timeoutUsec) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); }

void EthernetConnection::rcvMessages(){
void EthernetConnection::rcvMessages() {
// Receiving Messages Until Connection is Closed
while (!stopRcvMessages_) {
// Declarating Variables
Expand All @@ -285,20 +285,22 @@ void EthernetConnection::rcvMessages(){
bool received = true;

// Receiving Data Address
if(closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
if (closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;

// Receiving data size
if(closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
if (closed == 0) rcvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
received &= !closed;

// Receiving Data and Copying Data yo GPU
while(rcvSize < size && closed == 0){
uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize)/sizeof(char)) * sizeof(char);
while (rcvSize < size && closed == 0) {
uint64_t messageSize = std::min(rcvBufferSize_, (size - rcvSize) / sizeof(char)) * sizeof(char);
rcvSocket_->recvUntilEnd(rcvBuffer_, messageSize, &closed);
received &= !closed;

if(received) mscclpp::memcpyCuda<char>((char*)ptr + (rcvSize/sizeof(char)), rcvBuffer_, messageSize, cudaMemcpyHostToDevice);
if (received)
mscclpp::memcpyCuda<char>((char*)ptr + (rcvSize / sizeof(char)), rcvBuffer_, messageSize,
cudaMemcpyHostToDevice);
rcvSize += messageSize;
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ MSCCLPP_API_CPP std::shared_ptr<Connection> Context::connect(Endpoint localEndpo
throw mscclpp::Error("Local transport is IB but remote is not", ErrorCode::InvalidUsage);
}
conn = std::make_shared<IBConnection>(localEndpoint, remoteEndpoint, *this);
} else if(localEndpoint.transport() == Transport::Ethernet) {
} else if (localEndpoint.transport() == Transport::Ethernet) {
if (remoteEndpoint.transport() != Transport::Ethernet) {
throw mscclpp::Error("Local transport is Ethernet but remote is not", ErrorCode::InvalidUsage);
}
conn = std::make_shared<EthernetConnection>(localEndpoint, remoteEndpoint);
}
else {
} else {
throw mscclpp::Error("Unsupported transport", ErrorCode::InternalError);
}

Expand Down
10 changes: 5 additions & 5 deletions src/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

#include "api.h"
#include "context.hpp"
#include "utils_internal.hpp"
#include "socket.h"
#include "utils_internal.hpp"

namespace mscclpp {

Expand All @@ -16,8 +16,7 @@ Endpoint::Impl::Impl(EndpointConfig config, Context::Impl& contextImpl)
ibQp_ = contextImpl.getIbContext(transport_)
->createQp(config.ibMaxCqSize, config.ibMaxCqPollNum, config.ibMaxSendWr, 0, config.ibMaxWrPerSend);
ibQpInfo_ = ibQp_->getInfo();
}
else if(transport_ == Transport::Ethernet) {
} else if (transport_ == Transport::Ethernet) {
// Configuring Ethernet Interfaces
abortFlag_ = 0;
int ret = FindInterfaces(netIfName_, &socketAddress_, MAX_IF_NAME_SIZE, 1, "");
Expand All @@ -40,7 +39,8 @@ MSCCLPP_API_CPP std::vector<char> Endpoint::serialize() {
std::copy_n(reinterpret_cast<char*>(&pimpl_->ibQpInfo_), sizeof(pimpl_->ibQpInfo_), std::back_inserter(data));
}
if ((pimpl_->transport_) == Transport::Ethernet) {
std::copy_n(reinterpret_cast<char*>(&pimpl_->socketAddress_), sizeof(pimpl_->socketAddress_), std::back_inserter(data));
std::copy_n(reinterpret_cast<char*>(&pimpl_->socketAddress_), sizeof(pimpl_->socketAddress_),
std::back_inserter(data));
}
return data;
}
Expand All @@ -60,7 +60,7 @@ Endpoint::Impl::Impl(const std::vector<char>& serialization) {
std::copy_n(it, sizeof(ibQpInfo_), reinterpret_cast<char*>(&ibQpInfo_));
it += sizeof(ibQpInfo_);
}
if (transport_ == Transport::Ethernet){
if (transport_ == Transport::Ethernet) {
std::copy_n(it, sizeof(socketAddress_), reinterpret_cast<char*>(&socketAddress_));
it += sizeof(socketAddress_);
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Socket {
void accept(const Socket* listenSocket, int64_t timeout = -1);
void send(void* ptr, int size);
void recv(void* ptr, int size);
void recvUntilEnd(void *ptr, int size, int* closed);
void recvUntilEnd(void* ptr, int size, int* closed);
void close();

int getFd() const { return fd_; }
Expand Down
10 changes: 4 additions & 6 deletions test/mp_unit/communicator_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ void CommunicatorTestBase::connectMesh(bool useIpc, bool useIb, bool useEthernet
if (i != gEnv->rank) {
if ((rankToNode(i) == rankToNode(gEnv->rank)) && useIpc) {
connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc);
}
else if(useIb) {
} else if (useIb) {
connectionFutures[i] = communicator->connectOnSetup(i, 0, ibTransport);
}
else if(useEthernet) {
} else if (useEthernet) {
connectionFutures[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::Ethernet);
}
}
Expand Down Expand Up @@ -118,8 +116,8 @@ void CommunicatorTest::SetUp() {
devicePtr[n] = mscclpp::allocSharedCuda<int>(deviceBufferSize / sizeof(int));
registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks,
localMemory[n], remoteMemory[n]);
//registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks,
// localMemory[n], remoteMemory[n]);
// registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::Ethernet, 0, remoteRanks,
// localMemory[n], remoteMemory[n]);
}
}

Expand Down
5 changes: 3 additions & 2 deletions test/mp_unit/mp_unit_tests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase {
void SetUp() override;
void TearDown() override;

void setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels, bool useIPC, bool useIb, bool useEthernet,
void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0);
void setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels, bool useIPC, bool useIb,
bool useEthernet, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr,
size_t recvBuffBytes = 0);
void testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll);
void testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll);
void testPacketPingPong(bool useIbOnly);
Expand Down
27 changes: 13 additions & 14 deletions test/mp_unit/proxy_channel_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ void ProxyChannelOneToOneTest::SetUp() {
void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); }

void ProxyChannelOneToOneTest::setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels,
bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes,
void* recvBuff, size_t recvBuffBytes) {
bool useIPC, bool useIb, bool useEthernet, void* sendBuff,
size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) {
const int rank = communicator->bootstrap()->getRank();
const int worldSize = communicator->bootstrap()->getNranks();
const bool isInPlace = (recvBuff == nullptr);
mscclpp::TransportFlags transport;
mscclpp::TransportFlags transport;

if(useIPC) transport |= mscclpp::Transport::CudaIpc;
if(useIb) transport |= ibTransport;
if(useEthernet) transport |= mscclpp::Transport::Ethernet;
if (useIPC) transport |= mscclpp::Transport::CudaIpc;
if (useIb) transport |= ibTransport;
if (useEthernet) transport |= mscclpp::Transport::Ethernet;

std::vector<mscclpp::NonblockingFuture<std::shared_ptr<mscclpp::Connection>>> connectionFutures(worldSize);
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemFutures(worldSize);
Expand All @@ -42,10 +42,9 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vector<mscclpp::SimpleP
}
if ((rankToNode(r) == rankToNode(gEnv->rank)) && useIPC) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::CudaIpc);
} else if(useIb) {
} else if (useIb) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, ibTransport);
}
else if(useEthernet) {
} else if (useEthernet) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::Ethernet);
}

Expand Down Expand Up @@ -243,7 +242,7 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool us

TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(true, true, false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true , false, false); }
TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true, false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { testPingPong(false, false, true, false); }

Expand Down Expand Up @@ -335,8 +334,8 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) {
auto putPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);
auto getPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);

setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));
setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(),
nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));

ASSERT_EQ(proxyChannels.size(), 1);

Expand Down Expand Up @@ -402,8 +401,8 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) {
auto putPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);
auto getPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);

setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));
setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(),
nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));

ASSERT_EQ(proxyChannels.size(), 1);

Expand Down

0 comments on commit 4966faa

Please sign in to comment.