From 309f8f2db636c6920b40aeeaa94e8e3b6b45ef9a Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 9 Apr 2024 07:15:47 +0000 Subject: [PATCH] add test --- python/test/executor_test.py | 20 +----------------- python/test/test_mscclpp.py | 41 +++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/python/test/executor_test.py b/python/test/executor_test.py index 395d089b8..c4cd0a87c 100644 --- a/python/test/executor_test.py +++ b/python/test/executor_test.py @@ -53,25 +53,7 @@ def bench_time(niters: int, func): buffer = cp.random.random(nelems).astype(cp.float16) sub_arrays = cp.split(buffer, MPI.COMM_WORLD.size) sendbuf = sub_arrays[MPI.COMM_WORLD.rank] - - expected = cp.zeros_like(sendbuf) - for i in range(MPI.COMM_WORLD.size): - expected += sub_arrays[i] - - stream = cp.cuda.Stream(non_blocking=True) - executor.execute( - MPI.COMM_WORLD.rank, - sendbuf.data.ptr, - sendbuf.data.ptr, - sendbuf.nbytes, - sendbuf.nbytes, - DataType.float16, - 512, - execution_plan, - stream.ptr, - ) - stream.synchronize() - assert cp.allclose(sendbuf, expected, atol=1e-3 * MPI.COMM_WORLD.size) + mscclpp_group.barrier() execution_time = bench_time( 1000, diff --git a/python/test/test_mscclpp.py b/python/test/test_mscclpp.py index 4b3cb6ebf..f007718e3 100644 --- a/python/test/test_mscclpp.py +++ b/python/test/test_mscclpp.py @@ -12,7 +12,10 @@ import pytest from mscclpp import ( + DataType, EndpointConfig, + ExecutionPlan, + Executor, Fifo, Host2DeviceSemaphore, Host2HostSemaphore, @@ -25,7 +28,7 @@ import mscclpp.comm as mscclpp_comm from mscclpp.utils import KernelBuilder, pack from ._cpp import _ext -from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group +from .mscclpp_mpi import MpiGroup, parametrize_mpi_groups, mpi_group, N_GPUS_PER_NODE ethernet_interface_name = "eth0" @@ -590,3 +593,39 @@ def test_nvls(mpi_group: MpiGroup): kernel() cp.cuda.runtime.deviceSynchronize() group.barrier() + + +@parametrize_mpi_groups(2) +@pytest.mark.parametrize("filename", ["allreduce.json", "allreduce_packet.json"]) +def test_executor(mpi_group: MpiGroup, filename: str): + if all_ranks_on_the_same_node(mpi_group) is False: + pytest.skip("algo not support cross node") + project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + mscclpp_group = mscclpp_comm.CommGroup(mpi_group.comm) + executor = Executor(mscclpp_group.communicator, N_GPUS_PER_NODE) + execution_plan = ExecutionPlan("allreduce_pairs", os.path.join(project_dir, "test", "execution-files", filename)) + + nelems = 1024 * 1024 + cp.random.seed(42) + buffer = cp.random.random(nelems).astype(cp.float16) + sub_arrays = cp.split(buffer, mpi_group.comm.size) + sendbuf = sub_arrays[mpi_group.comm.rank] + expected = cp.zeros_like(sendbuf) + for i in range(mpi_group.comm.size): + expected += sub_arrays[i] + mscclpp_group.barrier() + + stream = cp.cuda.Stream(non_blocking=True) + executor.execute( + mpi_group.comm.rank, + sendbuf.data.ptr, + sendbuf.data.ptr, + sendbuf.nbytes, + sendbuf.nbytes, + DataType.float16, + 512, + execution_plan, + stream.ptr, + ) + stream.synchronize() + assert cp.allclose(sendbuf, expected, atol=1e-3 * mpi_group.comm.size)