Skip to content

Commit

Permalink
implementing and proxytest of updateAndSync in EthernetConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
caiomcbr committed Apr 5, 2024
1 parent 9f3e1af commit 1c9a9f3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/bootstrap/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ void Socket::recvUntilEnd(void *ptr, int size, int* closed){
*closed = 0;
if (state_ != SocketStateReady) {
std::stringstream ss;
ss << "socket state (" << state_ << ") is not ready";
ss << "socket state (" << state_ << ") is not ready in recvUntilEnd";
throw Error(ss.str(), ErrorCode::InternalError);
}

Expand Down
28 changes: 17 additions & 11 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,20 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe
}

void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
// Validating Transport Protocol
validateTransport(dst, remoteTransport());

// Initalizing Variables
uint64_t oldValue = *src;
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset);
uint64_t size = sizeof(uint64_t);
*src = newValue;
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);

*dstPtr = newValue;
// Sending Data
sendSocket_->send(&dstPtr, sizeof(char*));
sendSocket_->send(&size, sizeof(uint64_t));
sendSocket_->send(src, size);

INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);
}
Expand All @@ -260,17 +268,15 @@ void EthernetConnection::flush(int64_t timeoutUsec) {
}

void EthernetConnection::rcvMessages(){
// Declarating Variables
char* ptr = (char*)malloc(sizeof(char));
char* buffer;
uint64_t size;
int closed = 0;
bool received;

// Receiving Messages Until Connection is Closed
while (!stopRcvMessages_) {
received = true;

// Declarating Variables
char* ptr;
char* buffer;
uint64_t size;
int closed = 0;
bool received = true;

// Receiving Data Address
if(closed == 0) rcvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;
Expand Down
8 changes: 4 additions & 4 deletions test/mp_unit/mp_unit_tests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase {
void SetUp() override;
void TearDown() override;

void setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels, bool useIbOnly, void* sendBuff,
size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0);
void testPingPong(bool useIbOnly, bool waitWithPoll);
void testPingPongPerf(bool useIbOnly, bool waitWithPoll);
void setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels, bool useIPC, bool useIb, bool useEthernet,
void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0);
void testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll);
void testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll);
void testPacketPingPong(bool useIbOnly);
void testPacketPingPongPerf(bool useIbOnly);

Expand Down
44 changes: 28 additions & 16 deletions test/mp_unit/proxy_channel_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ void ProxyChannelOneToOneTest::SetUp() {
void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); }

void ProxyChannelOneToOneTest::setupMeshConnections(std::vector<mscclpp::SimpleProxyChannel>& proxyChannels,
bool useIbOnly, void* sendBuff, size_t sendBuffBytes,
bool useIPC, bool useIb, bool useEthernet, void* sendBuff, size_t sendBuffBytes,
void* recvBuff, size_t recvBuffBytes) {
const int rank = communicator->bootstrap()->getRank();
const int worldSize = communicator->bootstrap()->getNranks();
const bool isInPlace = (recvBuff == nullptr);
mscclpp::TransportFlags transport = (useIbOnly) ? ibTransport : (mscclpp::Transport::CudaIpc | ibTransport);
mscclpp::TransportFlags transport;

if(useIPC) transport |= mscclpp::Transport::CudaIpc;
if(useIb) transport |= ibTransport;
if(useEthernet) transport |= mscclpp::Transport::Ethernet;

std::vector<mscclpp::NonblockingFuture<std::shared_ptr<mscclpp::Connection>>> connectionFutures(worldSize);
std::vector<mscclpp::NonblockingFuture<mscclpp::RegisteredMemory>> remoteMemFutures(worldSize);
Expand All @@ -36,11 +40,14 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vector<mscclpp::SimpleP
if (r == rank) {
continue;
}
if ((rankToNode(r) == rankToNode(gEnv->rank)) && !useIbOnly) {
if ((rankToNode(r) == rankToNode(gEnv->rank)) && useIPC) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::CudaIpc);
} else {
} else if(useIb) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, ibTransport);
}
else if(useEthernet) {
connectionFutures[r] = communicator->connectOnSetup(r, 0, mscclpp::Transport::Ethernet);
}

if (isInPlace) {
communicator->sendMemoryOnSetup(sendBufRegMem, r, 0);
Expand Down Expand Up @@ -145,14 +152,14 @@ __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, bool waitWit
}
}

void ProxyChannelOneToOneTest::testPingPong(bool useIbOnly, bool waitWithPoll) {
void ProxyChannelOneToOneTest::testPingPong(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) {
if (gEnv->rank >= numRanksToUse) return;

const int nElem = 4 * 1024 * 1024;

std::vector<mscclpp::SimpleProxyChannel> proxyChannels;
std::shared_ptr<int> buff = mscclpp::allocExtSharedCuda<int>(nElem);
setupMeshConnections(proxyChannels, useIbOnly, buff.get(), nElem * sizeof(int));
setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int));

std::vector<DeviceHandle<mscclpp::SimpleProxyChannel>> proxyChannelHandles;
for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle());
Expand Down Expand Up @@ -190,14 +197,15 @@ void ProxyChannelOneToOneTest::testPingPong(bool useIbOnly, bool waitWithPoll) {
proxyService->stopProxy();
}

void ProxyChannelOneToOneTest::testPingPongPerf(bool useIbOnly, bool waitWithPoll) {
void ProxyChannelOneToOneTest::testPingPongPerf(bool useIPC, bool useIB, bool useEthernet, bool waitWithPoll) {
if (gEnv->rank >= numRanksToUse) return;

const int nElem = 4 * 1024 * 1024;
//const int nElem = 256000000;

std::vector<mscclpp::SimpleProxyChannel> proxyChannels;
std::shared_ptr<int> buff = mscclpp::allocExtSharedCuda<int>(nElem);
setupMeshConnections(proxyChannels, useIbOnly, buff.get(), nElem * sizeof(int));
setupMeshConnections(proxyChannels, useIPC, useIB, useEthernet, buff.get(), nElem * sizeof(int));

std::vector<DeviceHandle<mscclpp::SimpleProxyChannel>> proxyChannelHandles;
for (auto& ch : proxyChannels) proxyChannelHandles.push_back(ch.deviceHandle());
Expand Down Expand Up @@ -234,17 +242,21 @@ void ProxyChannelOneToOneTest::testPingPongPerf(bool useIbOnly, bool waitWithPol
proxyService->stopProxy();
}

TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(false, false); }
TEST_F(ProxyChannelOneToOneTest, PingPong) { testPingPong(true, true, false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(false, true , false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongEthernet) { testPingPong(false, false, true, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongIb) { testPingPong(true, false); }
TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { testPingPong(true, true, false, true); }

TEST_F(ProxyChannelOneToOneTest, PingPongWithPoll) { testPingPong(false, true); }
TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { testPingPong(false, true, false, true); }

TEST_F(ProxyChannelOneToOneTest, PingPongIbWithPoll) { testPingPong(true, true); }
TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { testPingPongPerf(true, true, false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongPerf) { testPingPongPerf(false, false); }
TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { testPingPongPerf(false, true, false, false); }

TEST_F(ProxyChannelOneToOneTest, PingPongPerfIb) { testPingPongPerf(true, false); }
TEST_F(ProxyChannelOneToOneTest, PingPongPerfEthernet) { testPingPongPerf(false, false, true, false); }

__device__ mscclpp::DeviceSyncer gChannelOneToOneTestProxyChansSyncer;

Expand Down Expand Up @@ -324,7 +336,7 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) {
auto putPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);
auto getPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);

setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));

ASSERT_EQ(proxyChannels.size(), 1);
Expand Down Expand Up @@ -391,7 +403,7 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) {
auto putPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);
auto getPacketBuffer = mscclpp::allocExtSharedCuda<mscclpp::LLPacket>(nPacket);

setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
setupMeshConnections(proxyChannels, !useIbOnly, true, false, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket),
getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket));

ASSERT_EQ(proxyChannels.size(), 1);
Expand Down

0 comments on commit 1c9a9f3

Please sign in to comment.