Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSRCHA-371 workaround #259

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/mscclpp/nvls_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct DeviceMulticastPointerDeviceHandle {
void* mcPtr;
size_t bufferSize;

#if defined(MSCCLPP_DEVICE_COMPILE)
#if defined(MSCCLPP_DEVICE_CUDA)
template <int NElemPerThread = 4, typename TVaule = float4, typename T = float>
MSCCLPP_DEVICE_INLINE void multimemLoad(TVaule& val, T* ptr) {
static_assert(NElemPerThread == 4, "Only support NElemPerThread == 4");
Expand Down Expand Up @@ -54,7 +54,7 @@ struct DeviceMulticastPointerDeviceHandle {
static_assert(dependentFalse<T>, "Not supported type");
}
};
#endif
#endif // defined(MSCCLPP_DEVICE_CUDA)
};

} // namespace mscclpp
Expand Down
12 changes: 10 additions & 2 deletions include/mscclpp/semaphore_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ struct Host2DeviceSemaphoreDeviceHandle {
/// @return true if the host has signaled.
MSCCLPP_DEVICE_INLINE bool poll() {
bool signaled = (atomicLoad(inboundSemaphoreId, memoryOrderAcquire) > (*expectedInboundSemaphoreId));
if (signaled) (*expectedInboundSemaphoreId) += 1;
if (signaled) {
(*expectedInboundSemaphoreId) += 1;
} else {
// TODO: MSRCHA-371
atomicStore(&inboundSemaphoreId[1], uint64_t{0}, memoryOrderRelaxed);
}
return signaled;
}

/// Wait for the host to signal.
MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 100000000) {
(*expectedInboundSemaphoreId) += 1;
POLL_MAYBE_JAILBREAK((atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId)),
// TODO: MSRCHA-371
POLL_MAYBE_JAILBREAK((atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))
? (atomicStore(&inboundSemaphoreId[1], uint64_t{0}, memoryOrderRelaxed), true)
: false,
maxSpinCount);
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)
Expand Down
4 changes: 2 additions & 2 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
validateTransport(dst, remoteTransport());
uint64_t oldValue = *src;
*src = newValue;
uint64_t* dstPtr = (uint64_t*)dst.data();
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);

MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr + dstOffset, src, sizeof(uint64_t), cudaMemcpyHostToDevice, stream_));
MSCCLPP_CUDATHROW(cudaMemcpyAsync(dstPtr, src, sizeof(uint64_t), cudaMemcpyHostToDevice, stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection atomic write: from %p to %p, %lu -> %lu", src, dstPtr + dstOffset, oldValue,
newValue);

Expand Down
3 changes: 2 additions & 1 deletion src/semaphore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ namespace mscclpp {

static NonblockingFuture<RegisteredMemory> setupInboundSemaphoreId(Communicator& communicator, Connection* connection,
void* localInboundSemaphoreId) {
// TODO: MSRCHA-371
auto localInboundSemaphoreIdsRegMem =
communicator.registerMemory(localInboundSemaphoreId, sizeof(uint64_t), connection->transport());
communicator.registerMemory(localInboundSemaphoreId, sizeof(uint64_t) * 2, connection->transport());
int remoteRank = communicator.remoteRankOf(*connection);
int tag = communicator.tagOf(*connection);
communicator.sendMemoryOnSetup(localInboundSemaphoreIdsRegMem, remoteRank, tag);
Expand Down
5 changes: 5 additions & 0 deletions test/mp_unit/proxy_channel_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,15 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) {
proxyService->stopProxy();
}

// TODO: MSRCHA-371
#if defined(MSCCLPP_DEVICE_CUDA)

TEST_F(ProxyChannelOneToOneTest, PacketPingPong) { testPacketPingPong(false); }

TEST_F(ProxyChannelOneToOneTest, PacketPingPongIb) { testPacketPingPong(true); }

TEST_F(ProxyChannelOneToOneTest, PacketPingPongPerf) { testPacketPingPongPerf(false); }

TEST_F(ProxyChannelOneToOneTest, PacketPingPongPerfIb) { testPacketPingPongPerf(true); }

#endif // defined(MSCCLPP_DEVICE_CUDA)
16 changes: 13 additions & 3 deletions test/nvls_test.cu
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <stdio.h>

#if (USE_NVLS)
#include <cuda.h>
#include <cudaTypedefs.h>
#include <cuda_runtime.h>
#include <mpi.h>
#include <stdio.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
Expand Down Expand Up @@ -71,7 +73,6 @@ __global__ void testing(float* mc_ptr, int size, int myrank, int nranks) {
}

int main() {
#if (USE_NVLS)
int myrank, nranks;
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
Expand Down Expand Up @@ -199,5 +200,14 @@ int main() {
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
#endif // (USE_NVLS)
return 0;
}

#else // !(USE_NVLS)

int main() {
printf("This test requires NVLS to be enabled\n");
return 0;
}

#endif // !(USE_NVLS)
Loading