Skip to content

Commit

Permalink
Add connection events for NPKit (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
yzygitzh authored Dec 4, 2024
1 parent 88d28e0 commit f6305a3
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 10 deletions.
33 changes: 29 additions & 4 deletions include/mscclpp/npkit/npkit_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,35 @@
#define NPKIT_EVENT_TIME_SYNC_GPU 0x1
#define NPKIT_EVENT_TIME_SYNC_CPU 0x2

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4
#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY 0x3
#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT 0x4
#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY 0x5
#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT 0x6
#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY 0x7
#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT 0x8

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x18
#define NPKIT_EVENT_CONN_IB_WRITE_ENTRY 0x9
#define NPKIT_EVENT_CONN_IB_WRITE_EXIT 0xA
#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY 0xB
#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT 0xC
#define NPKIT_EVENT_CONN_IB_FLUSH_ENTRY 0xD
#define NPKIT_EVENT_CONN_IB_FLUSH_EXIT 0xE

#define NPKIT_EVENT_CONN_ETH_WRITE_ENTRY 0xF
#define NPKIT_EVENT_CONN_ETH_WRITE_EXIT 0x10
#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY 0x11
#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT 0x12
#define NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY 0x13
#define NPKIT_EVENT_CONN_ETH_FLUSH_EXIT 0x14
#define NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY 0x15
#define NPKIT_EVENT_CONN_ETH_RECV_META_EXIT 0x16
#define NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY 0x17
#define NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT 0x18

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x19
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x1A

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x1B
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x2E

#endif
95 changes: 89 additions & 6 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ Transport CudaIpcConnection::remoteTransport() { return Transport::CudaIpc; }

void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);

Expand All @@ -71,10 +75,16 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register
MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size);

// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport());
uint64_t oldValue = *src;
*src = newValue;
Expand All @@ -84,17 +94,26 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void CudaIpcConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

if (timeoutUsec >= 0) {
INFO(MSCCLPP_P2P, "CudaIpcConnection flush: timeout is not supported, ignored");
}
AvoidCudaGraphCaptureGuard guard;
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream_));
// npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT);
INFO(MSCCLPP_P2P, "CudaIpcConnection flushing connection");

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

// IBConnection
Expand All @@ -118,6 +137,10 @@ Transport IBConnection::remoteTransport() { return remoteTransport_; }

void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);

Expand All @@ -139,10 +162,17 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem
qp->postSend();
INFO(MSCCLPP_NET, "IBConnection write: from %p to %p, size %lu", (uint8_t*)srcMr->getBuff() + srcOffset,
(uint8_t*)dstMrInfo.addr + dstOffset, size);
// npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst)->getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
Expand All @@ -159,9 +189,17 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6
qp->postSend();
INFO(MSCCLPP_NET, "IBConnection atomic Write: from %p to %p, %lu -> %lu", src, (uint8_t*)dstMrInfo.addr + dstOffset,
oldValue, newValue);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void IBConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

Timer timer;
while (qp->getNumCqItems()) {
int wcNum = qp->pollCq();
Expand All @@ -183,7 +221,10 @@ void IBConnection::flush(int64_t timeoutUsec) {
}
}
INFO(MSCCLPP_NET, "IBConnection flushing connection");
// npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

// EthernetConnection
Expand Down Expand Up @@ -233,6 +274,10 @@ Transport EthernetConnection::remoteTransport() { return Transport::Ethernet; }

void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

// Validating Transport Protocol
validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);
Expand Down Expand Up @@ -264,9 +309,17 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe
}

INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

// Validating Transport Protocol
validateTransport(dst, remoteTransport());

Expand All @@ -293,9 +346,23 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,

INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::flush(int64_t) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); }
void EthernetConnection::flush(int64_t) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

INFO(MSCCLPP_NET, "EthernetConnection flushing connection");

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::recvMessages() {
// Declarating Variables
Expand All @@ -307,6 +374,10 @@ void EthernetConnection::recvMessages() {

// Receiving Messages Until Connection is Closed
while (recvSocket_->getState() != SocketStateClosed) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 1);
#endif

// Receiving Data Address
if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;
Expand All @@ -315,6 +386,14 @@ void EthernetConnection::recvMessages() {
if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
received &= !closed;

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif

// Receiving Data and Copying Data yo GPU
recvSize = 0;
while (recvSize < size && closed == 0) {
Expand All @@ -327,6 +406,10 @@ void EthernetConnection::recvMessages() {
cudaMemcpyHostToDevice);
recvSize += messageSize;
}

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
}
}

Expand Down

0 comments on commit f6305a3

Please sign in to comment.