From 4a4d766bdb5d851dcc1d30c8ca6024a0101bddf7 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Mon, 18 Nov 2024 01:20:43 +0000 Subject: [PATCH] Add connection events for NPKit --- include/mscclpp/npkit/npkit_event.hpp | 33 ++++++++-- src/connection.cc | 95 +++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 10 deletions(-) diff --git a/include/mscclpp/npkit/npkit_event.hpp b/include/mscclpp/npkit/npkit_event.hpp index 1a24b241..d51b897d 100644 --- a/include/mscclpp/npkit/npkit_event.hpp +++ b/include/mscclpp/npkit/npkit_event.hpp @@ -9,10 +9,35 @@ #define NPKIT_EVENT_TIME_SYNC_GPU 0x1 #define NPKIT_EVENT_TIME_SYNC_CPU 0x2 -#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3 -#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4 +#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY 0x3 +#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT 0x4 +#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY 0x5 +#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT 0x6 +#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY 0x7 +#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT 0x8 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5 -#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x17 +#define NPKIT_EVENT_CONN_IB_WRITE_ENTRY 0x9 +#define NPKIT_EVENT_CONN_IB_WRITE_EXIT 0xA +#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY 0xB +#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT 0xC +#define NPKIT_EVENT_CONN_IB_FLUSH_ENTRY 0xD +#define NPKIT_EVENT_CONN_IB_FLUSH_EXIT 0xE + +#define NPKIT_EVENT_CONN_ETH_WRITE_ENTRY 0xF +#define NPKIT_EVENT_CONN_ETH_WRITE_EXIT 0x10 +#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY 0x11 +#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT 0x12 +#define NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY 0x13 +#define NPKIT_EVENT_CONN_ETH_FLUSH_EXIT 0x14 +#define NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY 0x15 +#define NPKIT_EVENT_CONN_ETH_RECV_META_EXIT 0x16 +#define NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY 0x17 +#define NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT 0x18 + +#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x19 +#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x1A + +#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x1B +#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x2D #endif diff --git a/src/connection.cc b/src/connection.cc index 79c4c963..6a5b554d 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -62,6 +62,10 @@ Transport CudaIpcConnection::remoteTransport() { return Transport::CudaIpc; } void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif + validateTransport(dst, remoteTransport(), dstOffset, size); validateTransport(src, transport(), srcOffset, size); @@ -71,10 +75,16 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream_)); INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size); - // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + validateTransport(dst, remoteTransport()); uint64_t oldValue = *src; *src = newValue; @@ -84,17 +94,26 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); - // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif } void CudaIpcConnection::flush(int64_t timeoutUsec) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + if (timeoutUsec >= 0) { INFO(MSCCLPP_P2P, "CudaIpcConnection flush: timeout is not supported, ignored"); } AvoidCudaGraphCaptureGuard guard; MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream_)); - // npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT); INFO(MSCCLPP_P2P, "CudaIpcConnection flushing connection"); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif } // IBConnection @@ -118,6 +137,10 @@ Transport IBConnection::remoteTransport() { return remoteTransport_; } void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif + validateTransport(dst, remoteTransport(), dstOffset, size); validateTransport(src, transport(), srcOffset, size); @@ -139,10 +162,17 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem qp->postSend(); INFO(MSCCLPP_NET, "IBConnection write: from %p to %p, size %lu", (uint8_t*)srcMr->getBuff() + srcOffset, (uint8_t*)dstMrInfo.addr + dstOffset, size); - // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif } void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + validateTransport(dst, remoteTransport()); auto dstTransportInfo = getImpl(dst)->getTransportInfo(remoteTransport()); if (dstTransportInfo.ibLocal) { @@ -159,9 +189,17 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6 qp->postSend(); INFO(MSCCLPP_NET, "IBConnection atomic Write: from %p to %p, %lu -> %lu", src, (uint8_t*)dstMrInfo.addr + dstOffset, oldValue, newValue); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif } void IBConnection::flush(int64_t timeoutUsec) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + Timer timer; while (qp->getNumCqItems()) { int wcNum = qp->pollCq(); @@ -183,7 +221,10 @@ void IBConnection::flush(int64_t timeoutUsec) { } } INFO(MSCCLPP_NET, "IBConnection flushing connection"); - // npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif } // EthernetConnection @@ -233,6 +274,10 @@ Transport EthernetConnection::remoteTransport() { return Transport::Ethernet; } void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif + // Validating Transport Protocol validateTransport(dst, remoteTransport(), dstOffset, size); validateTransport(src, transport(), srcOffset, size); @@ -264,9 +309,17 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe } INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0); +#endif } void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + // Validating Transport Protocol validateTransport(dst, remoteTransport()); @@ -293,9 +346,23 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue, newValue); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif } -void EthernetConnection::flush(int64_t) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); } +void EthernetConnection::flush(int64_t) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif + + INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0); +#endif +} void EthernetConnection::recvMessages() { // Declarating Variables @@ -307,6 +374,10 @@ void EthernetConnection::recvMessages() { // Receiving Messages Until Connection is Closed while (recvSocket_->getState() != SocketStateClosed) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 1); +#endif + // Receiving Data Address if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); received &= !closed; @@ -315,6 +386,14 @@ void EthernetConnection::recvMessages() { if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); received &= !closed; +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1); +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1); +#endif + // Receiving Data and Copying Data yo GPU recvSize = 0; while (recvSize < size && closed == 0) { @@ -327,6 +406,10 @@ void EthernetConnection::recvMessages() { cudaMemcpyHostToDevice); recvSize += messageSize; } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT) + NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1); +#endif } }