Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection events for NPKit #386

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions include/mscclpp/npkit/npkit_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,35 @@
#define NPKIT_EVENT_TIME_SYNC_GPU 0x1
#define NPKIT_EVENT_TIME_SYNC_CPU 0x2

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x3
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x4
#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY 0x3
#define NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT 0x4
#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY 0x5
#define NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT 0x6
#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY 0x7
#define NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT 0x8

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x5
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x17
#define NPKIT_EVENT_CONN_IB_WRITE_ENTRY 0x9
#define NPKIT_EVENT_CONN_IB_WRITE_EXIT 0xA
#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY 0xB
#define NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT 0xC
#define NPKIT_EVENT_CONN_IB_FLUSH_ENTRY 0xD
#define NPKIT_EVENT_CONN_IB_FLUSH_EXIT 0xE

#define NPKIT_EVENT_CONN_ETH_WRITE_ENTRY 0xF
#define NPKIT_EVENT_CONN_ETH_WRITE_EXIT 0x10
#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY 0x11
#define NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT 0x12
#define NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY 0x13
#define NPKIT_EVENT_CONN_ETH_FLUSH_EXIT 0x14
#define NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY 0x15
#define NPKIT_EVENT_CONN_ETH_RECV_META_EXIT 0x16
#define NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY 0x17
#define NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT 0x18

#define NPKIT_EVENT_EXECUTOR_INIT_ENTRY 0x19
#define NPKIT_EVENT_EXECUTOR_INIT_EXIT 0x1A

#define NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY 0x1B
#define NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT 0x2D

#endif
95 changes: 89 additions & 6 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ Transport CudaIpcConnection::remoteTransport() { return Transport::CudaIpc; }

void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);

Expand All @@ -71,10 +75,16 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register
MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, srcPtr + srcOffset, size, cudaMemcpyDeviceToDevice, stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, size %lu", srcPtr + srcOffset, dstPtr + dstOffset, size);

// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport());
uint64_t oldValue = *src;
*src = newValue;
Expand All @@ -84,17 +94,26 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void CudaIpcConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

if (timeoutUsec >= 0) {
INFO(MSCCLPP_P2P, "CudaIpcConnection flush: timeout is not supported, ignored");
}
AvoidCudaGraphCaptureGuard guard;
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream_));
// npkitCollectExitEvents(conn, NPKIT_EVENT_DMA_SEND_EXIT);
INFO(MSCCLPP_P2P, "CudaIpcConnection flushing connection");

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

// IBConnection
Expand All @@ -118,6 +137,10 @@ Transport IBConnection::remoteTransport() { return remoteTransport_; }

void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);

Expand All @@ -139,10 +162,17 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem
qp->postSend();
INFO(MSCCLPP_NET, "IBConnection write: from %p to %p, size %lu", (uint8_t*)srcMr->getBuff() + srcOffset,
(uint8_t*)dstMrInfo.addr + dstOffset, size);
// npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst)->getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
Expand All @@ -159,9 +189,17 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6
qp->postSend();
INFO(MSCCLPP_NET, "IBConnection atomic Write: from %p to %p, %lu -> %lu", src, (uint8_t*)dstMrInfo.addr + dstOffset,
oldValue, newValue);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void IBConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

Timer timer;
while (qp->getNumCqItems()) {
int wcNum = qp->pollCq();
Expand All @@ -183,7 +221,10 @@ void IBConnection::flush(int64_t timeoutUsec) {
}
}
INFO(MSCCLPP_NET, "IBConnection flushing connection");
// npkitCollectExitEvents(conn, NPKIT_EVENT_IB_SEND_EXIT);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

// EthernetConnection
Expand Down Expand Up @@ -233,6 +274,10 @@ Transport EthernetConnection::remoteTransport() { return Transport::Ethernet; }

void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif

// Validating Transport Protocol
validateTransport(dst, remoteTransport(), dstOffset, size);
validateTransport(src, transport(), srcOffset, size);
Expand Down Expand Up @@ -264,9 +309,17 @@ void EthernetConnection::write(RegisteredMemory dst, uint64_t dstOffset, Registe
}

INFO(MSCCLPP_NET, "EthernetConnection write: from %p to %p, size %lu", srcPtr, dstPtr, size);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_WRITE_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_WRITE_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

// Validating Transport Protocol
validateTransport(dst, remoteTransport());

Expand All @@ -293,9 +346,23 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,

INFO(MSCCLPP_NET, "EthernetConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_UPDATE_AND_SYNC_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::flush(int64_t) { INFO(MSCCLPP_NET, "EthernetConnection flushing connection"); }
void EthernetConnection::flush(int64_t) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif

INFO(MSCCLPP_NET, "EthernetConnection flushing connection");

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_EXIT, 0, 0, *NpKit::GetCpuTimestamp(), 0);
#endif
}

void EthernetConnection::recvMessages() {
// Declarating Variables
Expand All @@ -307,6 +374,10 @@ void EthernetConnection::recvMessages() {

// Receiving Messages Until Connection is Closed
while (recvSocket_->getState() != SocketStateClosed) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 1);
#endif

// Receiving Data Address
if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;
Expand All @@ -315,6 +386,14 @@ void EthernetConnection::recvMessages() {
if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
received &= !closed;

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif

// Receiving Data and Copying Data yo GPU
recvSize = 0;
while (recvSize < size && closed == 0) {
Expand All @@ -327,6 +406,10 @@ void EthernetConnection::recvMessages() {
cudaMemcpyHostToDevice);
recvSize += messageSize;
}

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
}
}

Expand Down
Loading