diff --git a/src/components/tl/cuda/Makefile.am b/src/components/tl/cuda/Makefile.am index e22796e6fa..65fb41ca1f 100644 --- a/src/components/tl/cuda/Makefile.am +++ b/src/components/tl/cuda/Makefile.am @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # Copyright (c) Meta Platforms, Inc. and affiliates. 2022. # @@ -27,6 +27,11 @@ alltoallv = \ alltoallv/alltoallv.c \ alltoallv/alltoallv_ce.c +bcast = \ + bcast/bcast.h \ + bcast/bcast.c \ + bcast/bcast_linear.c + reduce_scatter = \ reduce_scatter/reduce_scatter.h \ reduce_scatter/reduce_scatter.c \ @@ -54,6 +59,7 @@ sources = \ $(allgatherv) \ $(alltoall) \ $(alltoallv) \ + $(bcast) \ $(reduce_scatter) \ $(reduce_scatterv) diff --git a/src/components/tl/cuda/allgather/allgather.c b/src/components/tl/cuda/allgather/allgather.c index 01996da4da..362191b3ac 100644 --- a/src/components/tl/cuda/allgather/allgather.c +++ b/src/components/tl/cuda/allgather/allgather.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -44,7 +44,7 @@ ucc_status_t ucc_tl_cuda_allgather_init(ucc_base_coll_args_t *coll_args, { ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); - if (ucc_tl_cuda_team_topo_is_fully_conntected(team->topo)) { + if (ucc_tl_cuda_team_topo_is_fully_connected(team->topo)) { return ucc_tl_cuda_allgather_linear_init(coll_args, tl_team, task_p); } else { return ucc_tl_cuda_allgather_ring_init(coll_args, tl_team, task_p); diff --git a/src/components/tl/cuda/allgather/allgather_linear.c b/src/components/tl/cuda/allgather/allgather_linear.c index ed228d1683..d0b416257b 100644 --- a/src/components/tl/cuda/allgather/allgather_linear.c +++ b/src/components/tl/cuda/allgather/allgather_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -15,7 +15,7 @@ ucc_status_t ucc_tl_cuda_allgather_linear_init(ucc_base_coll_args_t *coll_args, ucc_tl_cuda_task_t *task; ucc_status_t status; - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_connected(team->topo) || UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/allgatherv/allgatherv.c b/src/components/tl/cuda/allgatherv/allgatherv.c index 5a8f78c481..4a73bbdf08 100644 --- a/src/components/tl/cuda/allgatherv/allgatherv.c +++ b/src/components/tl/cuda/allgatherv/allgatherv.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -47,7 +47,7 @@ ucc_status_t ucc_tl_cuda_allgatherv_init(ucc_base_coll_args_t *coll_args, { ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); - if (ucc_tl_cuda_team_topo_is_fully_conntected(team->topo)) { + if (ucc_tl_cuda_team_topo_is_fully_connected(team->topo)) { return ucc_tl_cuda_allgatherv_linear_init(coll_args, tl_team, task_p); } else { return ucc_tl_cuda_allgatherv_ring_init(coll_args, tl_team, task_p); diff --git a/src/components/tl/cuda/allgatherv/allgatherv_linear.c b/src/components/tl/cuda/allgatherv/allgatherv_linear.c index 0fca5c6af6..9a8b5db140 100644 --- a/src/components/tl/cuda/allgatherv/allgatherv_linear.c +++ b/src/components/tl/cuda/allgatherv/allgatherv_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -55,22 +55,6 @@ enum * other ranks to finish */ }; -static inline int get_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int step_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - return sync->seq_num[step_id]; -} - -static inline void set_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int step, int step_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - sync->seq_num[step_id] = step; -} - ucc_status_t ucc_tl_cuda_allgatherv_linear_finalize(ucc_coll_task_t *coll_task) { ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); @@ -432,7 +416,7 @@ ucc_status_t ucc_tl_cuda_allgatherv_linear_init(ucc_base_coll_args_t *coll_args, ucc_tl_cuda_task_t *task; ucc_status_t status; - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_connected(team->topo) || UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/bcast/bcast.c b/src/components/tl/cuda/bcast/bcast.c new file mode 100644 index 0000000000..954cf86d9f --- /dev/null +++ b/src/components/tl/cuda/bcast/bcast.c @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "bcast.h" +#include "components/mc/ucc_mc.h" + +ucc_base_coll_alg_info_t + ucc_tl_cuda_bcast_algs[UCC_TL_CUDA_BCAST_ALG_LAST + 1] = { + [UCC_TL_CUDA_BCAST_ALG_LINEAR] = {.id = UCC_TL_CUDA_BCAST_ALG_LINEAR, + .name = "linear", + .desc = "linear bcast algorithm"}, + [UCC_TL_CUDA_BCAST_ALG_LAST] = {.id = 0, .name = NULL, .desc = NULL}}; + +ucc_status_t ucc_tl_cuda_bcast_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *tl_team, + ucc_coll_task_t **task_p) +{ + ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); + + if (ucc_tl_cuda_team_topo_is_fully_connected(team->topo)) { + return ucc_tl_cuda_bcast_linear_init(coll_args, tl_team, task_p); + } else { + return UCC_ERR_NOT_SUPPORTED; + } +} diff --git a/src/components/tl/cuda/bcast/bcast.h b/src/components/tl/cuda/bcast/bcast.h new file mode 100644 index 0000000000..5810bcc89d --- /dev/null +++ b/src/components/tl/cuda/bcast/bcast.h @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef BCAST_H_ +#define BCAST_H_ + +#include "tl_cuda.h" +#include "tl_cuda_coll.h" + +enum +{ + UCC_TL_CUDA_BCAST_ALG_LINEAR, + UCC_TL_CUDA_BCAST_ALG_LAST +}; + +extern ucc_base_coll_alg_info_t + ucc_tl_cuda_bcast_algs[UCC_TL_CUDA_BCAST_ALG_LAST + 1]; + +#define UCC_TL_CUDA_BCAST_DEFAULT_ALG_SELECT_STR "bcast:cuda:@0" + +ucc_status_t ucc_tl_cuda_bcast_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *tl_team, + ucc_coll_task_t **task_p); + +ucc_status_t ucc_tl_cuda_bcast_linear_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *tl_team, + ucc_coll_task_t **task_p); + +static inline int ucc_tl_cuda_bcast_alg_from_str(const char *str) +{ + int i; + for (i = 0; i < UCC_TL_CUDA_BCAST_ALG_LAST; i++) { + if (0 == strcasecmp(str, ucc_tl_cuda_bcast_algs[i].name)) { + break; + } + } + return i; +} + +#endif diff --git a/src/components/tl/cuda/bcast/bcast_linear.c b/src/components/tl/cuda/bcast/bcast_linear.c new file mode 100644 index 0000000000..9b2915fa39 --- /dev/null +++ b/src/components/tl/cuda/bcast/bcast_linear.c @@ -0,0 +1,422 @@ +/** + * Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "bcast.h" + +enum { + // Barrier setup stages + STAGE_INIT_BAR_ROOT, // Initial stage for the root rank to identify and claim a free barrier + STAGE_FIND_BAR_PEER, // Stage where peer ranks wait while the root rank identifies a free barrier + + STAGE_SYNC, // Initialize the barrier and synchronize the segment required for the current task + STAGE_SETUP, // Verify that all ranks are aligned and have reached the barrier + // Stages specific to the root rank + STAGE_COPY, // Post copy task: copy data block from src to a scratch buffer + STAGE_WAIT_COPY, // The root waits for the completion of its copy operation + STAGE_WAIT_ALL, // The root rank waits until all other ranks have reached the same operational step + STAGE_WAIT_COMPLETION, // The root rank waits for all other ranks to complete the broadcast operation + // non-root + STAGE_WAIT_ROOT, // Wait while the root rank writes data to its scratch buffer + STAGE_CLIENT_COPY, // Initiate their own copy tasks after the root's operations + STAGE_CLIENT_COPY_WAIT, // Wait for the completion of the copy operation from the root's scratch buffer + STAGE_CLIENT_WAIT_COMPLETION, // Wait for the completion of algorithm on all ranks, global sync with root +}; + +static inline ucc_status_t +ucc_tl_cuda_bcast_linear_setup_start(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + + set_rank_step(task, trank, 0, 0); // Initialize rank step tracking + ucc_memory_cpu_store_fence(); + // initiate barrier wait while all ranks set theirs steps to 0 + return ucc_tl_cuda_shm_barrier_start(UCC_TL_TEAM_RANK(team), task->bar); +} + +// Tests if setup is complete for a linear broadcast task +static inline ucc_status_t +ucc_tl_cuda_bcast_linear_setup_test(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + return ucc_tl_cuda_shm_barrier_test(UCC_TL_TEAM_RANK(team), task->bar); +} + +// Returns the size of the scratch buffer used for data transfers +static inline size_t get_raw_scratch_size(ucc_tl_cuda_team_t *team) +{ + return UCC_TL_CUDA_TEAM_LIB(team)->cfg.scratch_size; +} + +// Posts a copy task to the CUDA executor +static inline ucc_status_t ecopy(void *dst, void *src, size_t size, + ucc_ee_executor_t *exec, + ucc_ee_executor_task_t **etask) +{ + ucc_ee_executor_task_args_t exec_args = {0}; + + exec_args.task_type = UCC_EE_EXECUTOR_TASK_COPY; + exec_args.copy.dst = dst; + exec_args.copy.src = src; + exec_args.copy.len = size; + return ucc_ee_executor_task_post(exec, &exec_args, etask); +} + +// Root rank searches for and claims a free barrier +static inline ucc_status_t root_find_free_barrier(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent; + ucc_tl_cuda_shm_barrier_t *curr_bar; + int i; + ucc_status_t st; + + // Iterate over available barriers in active set pool to find a free one + for (i = 0; i < max_concurrent; ++i) { + curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); + // try to set user specified tag to mark that this barrier is used by this task + if (ucc_atomic_cswap64(&curr_bar->tag, UCC_TL_CUDA_TAG_FREE, + task->bcast_linear.key) == UCC_TL_CUDA_TAG_FREE) { + ucc_debug("Acquire barrier: %p idx: %d marked with tag: %ld", + curr_bar, i, curr_bar->tag); + task->bar = curr_bar; + st = ucc_tl_cuda_shm_barrier_init_root( + task->subset.map.ep_num, task->subset.myrank, + task->bcast_linear.root, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to init root barrier"); + return UCC_ERR_NO_RESOURCE; + } + // Assign a collective ID (index of barrier) + task->coll_id = i + max_concurrent; + return UCC_OK; + } + } + // try next time + return UCC_ERR_NOT_FOUND; +} + +// Peer rank searches for a barrier claimed by the root +static inline ucc_status_t peer_find_free_barrier(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + uint32_t max_concurrent = UCC_TL_CUDA_TEAM_LIB(team)->cfg.max_concurrent; + ucc_tl_cuda_shm_barrier_t *curr_bar; + int i; + ucc_status_t st; + + for (i = 0; i < max_concurrent; ++i) { + curr_bar = UCC_TL_CUDA_TEAM_BARRIER(team, max_concurrent + i); + // Check if the barrier is claimed by the task's root + if (curr_bar->tag == task->bcast_linear.key) { + task->bar = curr_bar; + st = ucc_tl_cuda_shm_barrier_init_root( + task->subset.map.ep_num, task->subset.myrank, + task->bcast_linear.root, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to init peer barrier"); + return UCC_ERR_NO_RESOURCE; + } + task->coll_id = i + max_concurrent; + return UCC_OK; + } + } + // try next time + return UCC_ERR_NOT_FOUND; +} + +static ucc_status_t +ucc_tl_cuda_bcast_linear_finalize(ucc_coll_task_t *coll_task) +{ + ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); + + tl_trace(UCC_TASK_LIB(task), "finalizing task %p", task); + ucc_tl_cuda_task_put(task); + return UCC_OK; +} + +static void ucc_tl_cuda_bcast_linear_progress(ucc_coll_task_t *coll_task) +{ + ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + size_t half_scratch_size = get_raw_scratch_size(team) / 2; + ucc_rank_t tsize = UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task)) + ? (ucc_rank_t)task->subset.map.ep_num + : UCC_TL_TEAM_SIZE(team); + size_t chunk_size = + task->bcast_linear.step < task->bcast_linear.num_steps + ? ucc_min(half_scratch_size, task->bcast_linear.size) + : task->bcast_linear.size - + (task->bcast_linear.step - 1) * half_scratch_size; + size_t offset_buff = task->bcast_linear.step * half_scratch_size; + ucc_ee_executor_t *exec; + ucc_ee_executor_task_t *etask; + void *sbuf, *dbuf; + ucc_rank_t peer; + ucc_status_t st; + int i; + + task->super.status = UCC_INPROGRESS; + + st = ucc_coll_task_get_executor(&task->super, &exec); + if (ucc_unlikely(st != UCC_OK)) { + task->super.status = st; + return; + } + + switch (task->bcast_linear.stage) { + case STAGE_INIT_BAR_ROOT: + st = root_find_free_barrier(task); + if (st == UCC_OK) { + task->bcast_linear.stage = STAGE_SYNC; + } else if (st != UCC_ERR_NOT_FOUND) { + task->super.status = st; + } + // no free barriers found, try next time + return; + case STAGE_FIND_BAR_PEER: + st = peer_find_free_barrier(task); + if (st == UCC_OK) { + // barrier found, continue to next stages + task->bcast_linear.stage = STAGE_SYNC; + } else if (st != UCC_ERR_NOT_FOUND) { + task->super.status = st; + } + // no free barriers found by root, try next time + return; + case STAGE_SYNC: + if (ucc_tl_cuda_get_sync_root(task, task->bcast_linear.root) != UCC_OK) { + return; + } + task->bcast_linear.step = 0; + st = ucc_tl_cuda_bcast_linear_setup_start(task); + if (st != UCC_OK) { + task->super.status = st; + return; + } + task->bcast_linear.stage = STAGE_SETUP; + case STAGE_SETUP: + st = ucc_tl_cuda_bcast_linear_setup_test(task); + if (st != UCC_OK) { + task->super.status = st; + return; + } + if (trank == task->bcast_linear.root) { + task->bcast_linear.stage = STAGE_COPY; + } else { + task->bcast_linear.stage = STAGE_WAIT_ROOT; + } + default: + break; + } + + if (trank == task->bcast_linear.root) { + // Root scenario + // fall-through between cases is intentional + switch (task->bcast_linear.stage) { + case STAGE_COPY: + // copy from src buffer to scratch + dbuf = PTR_OFFSET(TASK_SCRATCH(task, trank), + task->bcast_linear.step % 2 * half_scratch_size); + sbuf = PTR_OFFSET(task->bcast_linear.sbuf, offset_buff); + st = ecopy(dbuf, sbuf, chunk_size, exec, + &task->bcast_linear.exec_task); + if (st != UCC_OK) { + ucc_error("failed to post ecopy task"); + task->super.status = st; + return; + } + task->bcast_linear.stage = STAGE_WAIT_COPY; + case STAGE_WAIT_COPY: + etask = task->bcast_linear.exec_task; + ucc_assert(NULL != etask); + st = ucc_ee_executor_task_test(etask); + if (st != UCC_OK) { + return; // not ready + } + ucc_ee_executor_task_finalize(etask); + task->bcast_linear.exec_task = NULL; + // signal others + ++task->bcast_linear.step; + set_rank_step(task, task->bcast_linear.root, + task->bcast_linear.step, 0); + task->bcast_linear.stage = STAGE_WAIT_ALL; + case STAGE_WAIT_ALL: + for (i = 0; i < tsize; ++i) { + if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) { + // eval phys rank from virt + peer = ucc_ep_map_eval(task->subset.map, i); + } else { + peer = i; + } + // need to wait until all ranks complete step - 1, because of double buffering + if (get_rank_step(task, peer, 0) < + task->bcast_linear.step - 1) { + // rank is not ready, lets wait + return; + } + } + task->bcast_linear.stage = STAGE_COPY; + if (task->bcast_linear.step < task->bcast_linear.num_steps) { + // go to next iteration + task->bcast_linear.stage = STAGE_COPY; + return; + } + // finish + st = ucc_tl_cuda_shm_barrier_start(trank, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to start barrier from root rank"); + task->super.status = st; + return; + } + task->bcast_linear.stage = STAGE_WAIT_COMPLETION; + case STAGE_WAIT_COMPLETION: + st = ucc_tl_cuda_shm_barrier_test(trank, task->bar); + if (st != UCC_OK) { + // peers still working, lets check next time + task->super.status = st; + return; + } + // set barrier free to unlock others, this is roots responsibility + ucc_debug("Release bar: %p with tag: %ld", task->bar, + task->bar->tag); + task->bar->tag = UCC_TL_CUDA_TAG_FREE; + ucc_tl_cuda_put_sync_root(task, task->bcast_linear.root); + task->super.status = UCC_OK; + break; + default: + ucc_assert(0); + break; + } + } else { + // clients + // fall-through between cases is intentional + switch (task->bcast_linear.stage) { + case STAGE_WAIT_ROOT: + if (get_rank_step(task, task->bcast_linear.root, 0) > + task->bcast_linear.step) { + task->bcast_linear.stage = STAGE_CLIENT_COPY; + break; + } else { + return; + } + case STAGE_CLIENT_COPY: + // need to copy from root's scratch buffer + dbuf = PTR_OFFSET(task->bcast_linear.sbuf, offset_buff); + sbuf = PTR_OFFSET(TASK_SCRATCH(task, task->bcast_linear.root), + task->bcast_linear.step % 2 * chunk_size); + st = ecopy(dbuf, sbuf, chunk_size, exec, + &task->bcast_linear.exec_task); + if (st != UCC_OK) { + ucc_error("failed to post ecopy task at client"); + task->super.status = st; + return; + } + task->bcast_linear.stage = STAGE_CLIENT_COPY_WAIT; + case STAGE_CLIENT_COPY_WAIT: + etask = task->bcast_linear.exec_task; + ucc_assert(NULL != etask); + st = ucc_ee_executor_task_test(etask); + if (st != UCC_OK) { + return; // executor task is not ready + } + ucc_ee_executor_task_finalize(etask); + task->bcast_linear.exec_task = NULL; + ++task->bcast_linear.step; + set_rank_step(task, trank, task->bcast_linear.step, 0); + if (task->bcast_linear.step < task->bcast_linear.num_steps) { + task->bcast_linear.stage = STAGE_WAIT_ROOT; + return; + } + // start barrier to sync with root + task->bcast_linear.stage = STAGE_CLIENT_WAIT_COMPLETION; + st = ucc_tl_cuda_shm_barrier_start(trank, task->bar); + if (ucc_unlikely(st != UCC_OK)) { + ucc_error("failed to start barrier from peer rank"); + task->super.status = st; + return; + } + break; + case STAGE_CLIENT_WAIT_COMPLETION: + st = ucc_tl_cuda_shm_barrier_test(trank, task->bar); + if (st != UCC_OK) { + // someone still working, lets check next time + task->super.status = st; + return; + } + task->super.status = UCC_OK; + break; + default: + ucc_assert(0); + break; + } + } +} + +static ucc_status_t ucc_tl_cuda_bcast_linear_start(ucc_coll_task_t *coll_task) +{ + ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t); + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + ucc_coll_args_t *args = &TASK_ARGS(task); + ucc_datatype_t dt = task->bcast_linear.dt; + size_t half_scratch_size = get_raw_scratch_size(team) / 2; + + task->bcast_linear.stage = STAGE_SYNC; + + // in case of active set bcast we need to do additional steps to find free barriers + if (UCC_COLL_ARGS_ACTIVE_SET(&TASK_ARGS(task))) { + task->bcast_linear.stage = + UCC_TL_TEAM_RANK(team) == task->bcast_linear.root + ? STAGE_INIT_BAR_ROOT + : STAGE_FIND_BAR_PEER; + } + + task->bcast_linear.size = ucc_dt_size(dt) * args->src.info.count; + task->bcast_linear.num_steps = + ucc_div_round_up(task->bcast_linear.size, half_scratch_size); + + ucc_debug("bcast linear dt: %s, buffer size: %ld, num_steps: %d", + ucc_datatype_str(dt), task->bcast_linear.size, + task->bcast_linear.num_steps); + + task->bcast_linear.sbuf = args->src.info.buffer; + task->bcast_linear.step = 0; + + return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +} + +ucc_status_t ucc_tl_cuda_bcast_linear_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *tl_team, + ucc_coll_task_t **task_p) +{ + ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); + ucc_tl_cuda_task_t *task; + ucc_status_t status; + + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_connected(team->topo) || + UCC_TL_TEAM_SIZE(team) - 1 > + UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { + return UCC_ERR_NOT_SUPPORTED; + } + + status = ucc_tl_cuda_task_init(coll_args, team, &task); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + + task->bcast_linear.root = coll_args->args.root; + task->bcast_linear.dt = coll_args->args.src.info.datatype; + task->bcast_linear.sbuf = coll_args->args.src.info.buffer; + + task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR; + task->super.post = ucc_tl_cuda_bcast_linear_start; + task->super.progress = ucc_tl_cuda_bcast_linear_progress; + task->super.finalize = ucc_tl_cuda_bcast_linear_finalize; + + *task_p = &task->super; + return UCC_OK; +} diff --git a/src/components/tl/cuda/reduce_scatter/reduce_scatter.c b/src/components/tl/cuda/reduce_scatter/reduce_scatter.c index 468fd68338..237005c95b 100644 --- a/src/components/tl/cuda/reduce_scatter/reduce_scatter.c +++ b/src/components/tl/cuda/reduce_scatter/reduce_scatter.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -48,7 +48,7 @@ ucc_status_t ucc_tl_cuda_reduce_scatter_init(ucc_base_coll_args_t *coll_args, { ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); - if (ucc_tl_cuda_team_topo_is_fully_conntected(team->topo)) { + if (ucc_tl_cuda_team_topo_is_fully_connected(team->topo)) { return ucc_tl_cuda_reduce_scatter_linear_init(coll_args, tl_team, task_p); } else { diff --git a/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c b/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c index 46efbdb051..36801ce1d8 100644 --- a/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c +++ b/src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -19,7 +19,7 @@ ucc_status_t ucc_tl_cuda_reduce_scatter_linear_init(ucc_base_coll_args_t *coll_a return UCC_ERR_NOT_SUPPORTED; } - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_connected(team->topo) || UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv.c b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv.c index d85e2c8dd3..eef433cdbb 100644 --- a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv.c +++ b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -51,7 +51,7 @@ ucc_status_t ucc_tl_cuda_reduce_scatterv_init(ucc_base_coll_args_t *coll_args, { ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); - if (ucc_tl_cuda_team_topo_is_fully_conntected(team->topo)) { + if (ucc_tl_cuda_team_topo_is_fully_connected(team->topo)) { return ucc_tl_cuda_reduce_scatterv_linear_init(coll_args, tl_team, task_p); } else { diff --git a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c index 6a1ec5b22c..56e4e2204c 100644 --- a/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c +++ b/src/components/tl/cuda/reduce_scatterv/reduce_scatterv_linear.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -59,22 +59,6 @@ enum * other ranks to finish */ }; -static inline int get_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int step_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - return sync->seq_num[step_id]; -} - -static inline void set_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int step, int step_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - sync->seq_num[step_id] = step; -} - ucc_status_t ucc_tl_cuda_reduce_scatterv_linear_finalize(ucc_coll_task_t *coll_task) { @@ -448,7 +432,7 @@ ucc_tl_cuda_reduce_scatterv_linear_init(ucc_base_coll_args_t *coll_args, return UCC_ERR_NOT_SUPPORTED; } - if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_conntected(team->topo) || + if (ucc_unlikely(!ucc_tl_cuda_team_topo_is_fully_connected(team->topo) || UCC_TL_TEAM_SIZE(team) - 1 > UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/src/components/tl/cuda/tl_cuda.c b/src/components/tl/cuda/tl_cuda.c index 98dccf26bf..842db59c72 100644 --- a/src/components/tl/cuda/tl_cuda.c +++ b/src/components/tl/cuda/tl_cuda.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -9,6 +9,7 @@ #include "components/mc/base/ucc_mc_base.h" #include "allgather/allgather.h" #include "allgatherv/allgatherv.h" +#include "bcast/bcast.h" #include "reduce_scatter/reduce_scatter.h" #include "reduce_scatterv/reduce_scatterv.h" @@ -93,6 +94,8 @@ __attribute__((constructor)) static void tl_cuda_iface_init(void) ucc_tl_cuda_allgather_algs; ucc_tl_cuda.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_ALLGATHERV)] = ucc_tl_cuda_allgatherv_algs; + ucc_tl_cuda.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_BCAST)] = + ucc_tl_cuda_bcast_algs; ucc_tl_cuda.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_REDUCE_SCATTER)] = ucc_tl_cuda_reduce_scatter_algs; ucc_tl_cuda.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_REDUCE_SCATTERV)] = diff --git a/src/components/tl/cuda/tl_cuda.h b/src/components/tl/cuda/tl_cuda.h index 792100c80c..9742ac8ba2 100644 --- a/src/components/tl/cuda/tl_cuda.h +++ b/src/components/tl/cuda/tl_cuda.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * Copyright (c) Meta Platforms, Inc. and affiliates. 2022. * * See file LICENSE for terms. @@ -12,6 +12,7 @@ #include "components/tl/ucc_tl_log.h" #include "components/mc/ucc_mc.h" #include "utils/ucc_mpool.h" +#include "utils/ucc_datastruct.h" #include "tl_cuda_ep_hash.h" #include "tl_cuda_topo.h" #include "tl_cuda_team_topo.h" @@ -27,6 +28,7 @@ #define UCC_TL_CUDA_SUPPORTED_COLLS \ (UCC_COLL_TYPE_ALLTOALL | UCC_COLL_TYPE_ALLTOALLV | \ UCC_COLL_TYPE_ALLGATHER | UCC_COLL_TYPE_ALLGATHERV | \ + UCC_COLL_TYPE_BCAST | \ UCC_COLL_TYPE_REDUCE_SCATTER | UCC_COLL_TYPE_REDUCE_SCATTERV) #define UCC_TL_CUDA_TEAM_LIB(_team) \ @@ -72,7 +74,7 @@ extern ucc_tl_cuda_iface_t ucc_tl_cuda; typedef struct ucc_tl_cuda_lib_config { ucc_tl_lib_config_t super; - uint32_t max_concurrent; + uint32_t max_concurrent; // Maximum number of tasks that can be progressed simultaneously. size_t scratch_size; unsigned long allgather_ring_max_rings; uint32_t allgather_ring_num_chunks; @@ -104,9 +106,12 @@ UCC_CLASS_DECLARE(ucc_tl_cuda_context_t, const ucc_base_context_params_t *, typedef uint32_t ucc_tl_cuda_sync_state_t; +#define UCC_TL_CUDA_TAG_FREE 0xFFFFFFFFFFFFFFFF + typedef struct ucc_tl_cuda_shm_barrier { ucc_rank_t size; ucc_rank_t count; + uint64_t tag; int sense; ucc_status_t state[UCC_TL_CUDA_MAX_PEERS]; int local_sense[UCC_TL_CUDA_MAX_PEERS]; @@ -152,13 +157,15 @@ typedef struct ucc_tl_cuda_scratch { ucc_tl_cuda_mem_info_t rem_info[UCC_TL_CUDA_MAX_PEERS]; } ucc_tl_cuda_scratch_t; +// Team represents a communicator created within the CUDA context, typically using NVLink for inter-GPU communication typedef struct ucc_tl_cuda_team { ucc_tl_team_t super; - uint32_t seq_num; + uint32_t seq_num; // Counter for the number of launched collective tasks for this team + uint32_t seq_num_active_set; // Counter for tasks in the active set (subset of tasks requiring special handling) ucc_tl_cuda_team_topo_t *topo; - ucc_tl_cuda_sync_t *sync; - ucc_tl_cuda_sync_state_t *sync_state; - ucc_tl_cuda_shm_barrier_t *bar; + ucc_tl_cuda_sync_t *sync; // Pointer to shared memory segment for synchronization + ucc_tl_cuda_sync_state_t *sync_state; // Tracks the task currently using the sync segment of shared memory, if free - 0 + ucc_tl_cuda_shm_barrier_t *bar; // Pointer to the first barrier in an array of size [0; 2 * max_concurrent]. First max_concurrent barriers are for normal mode, the second one for active set mode ucc_tl_cuda_scratch_t scratch; cudaStream_t stream; ucc_tl_cuda_rank_id_t *ids; @@ -169,12 +176,14 @@ typedef struct ucc_tl_cuda_team { UCC_CLASS_DECLARE(ucc_tl_cuda_team_t, ucc_base_context_t *, const ucc_base_team_params_t *); +// Task represents a collective operation that runs in the CUDA context, typically using NVLink for inter-GPU communication typedef struct ucc_tl_cuda_task ucc_tl_cuda_task_t; struct ucc_tl_cuda_task { ucc_coll_task_t super; - uint32_t seq_num; - uint32_t coll_id; - ucc_tl_cuda_shm_barrier_t *bar; + uint32_t seq_num; // Sequential identifier for each task started within the team + uint32_t coll_id; // Index of the collective task in flight, within the range [0; max_concurrent) + ucc_tl_cuda_shm_barrier_t *bar; // Pointer to the reserved barrier for this task in the CUDA team + ucc_subset_t subset; // Mapping information for the active set, if it is present union { struct { int stage; @@ -224,6 +233,17 @@ struct ucc_tl_cuda_task { size_t (*get_offset)(const ucc_tl_cuda_task_t *task, ucc_rank_t block); } allgatherv_linear; + struct { + int stage; + int step; + void *sbuf; + ucc_datatype_t dt; + ucc_rank_t root; + size_t size; + int num_steps; + ucc_ee_executor_task_t *exec_task; + uint64_t key; // This is mix of user provided tag, root and peer to be unique for each task, algorithm uses it to mark barrier as used + } bcast_linear; struct { int stage; int num_frags; diff --git a/src/components/tl/cuda/tl_cuda_coll.c b/src/components/tl/cuda/tl_cuda_coll.c index 5d01cc1a94..71325dc826 100644 --- a/src/components/tl/cuda/tl_cuda_coll.c +++ b/src/components/tl/cuda/tl_cuda_coll.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -9,6 +9,7 @@ #include "alltoallv/alltoallv.h" #include "allgather/allgather.h" #include "allgatherv/allgatherv.h" +#include "bcast/bcast.h" #include "reduce_scatter/reduce_scatter.h" #include "reduce_scatterv/reduce_scatterv.h" #include "utils/arch/cpu.h" @@ -35,6 +36,7 @@ const char * ucc_tl_cuda_default_alg_select_str[UCC_TL_CUDA_N_DEFAULT_ALG_SELECT_STR] = { UCC_TL_CUDA_ALLGATHER_DEFAULT_ALG_SELECT_STR, UCC_TL_CUDA_ALLGATHERV_DEFAULT_ALG_SELECT_STR, + UCC_TL_CUDA_BCAST_DEFAULT_ALG_SELECT_STR, UCC_TL_CUDA_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR, UCC_TL_CUDA_REDUCE_SCATTERV_DEFAULT_ALG_SELECT_STR}; @@ -78,6 +80,8 @@ ucc_status_t ucc_tl_cuda_coll_init(ucc_base_coll_args_t *coll_args, return ucc_tl_cuda_allgather_init(coll_args, team, task_h); case UCC_COLL_TYPE_ALLGATHERV: return ucc_tl_cuda_allgatherv_init(coll_args, team, task_h); + case UCC_COLL_TYPE_BCAST: + return ucc_tl_cuda_bcast_init(coll_args, team, task_h); case UCC_COLL_TYPE_REDUCE_SCATTER: return ucc_tl_cuda_reduce_scatter_init(coll_args, team, task_h); case UCC_COLL_TYPE_REDUCE_SCATTERV: @@ -89,6 +93,19 @@ ucc_status_t ucc_tl_cuda_coll_init(ucc_base_coll_args_t *coll_args, } } +ucc_status_t ucc_tl_cuda_shm_barrier_init_root(ucc_rank_t size, ucc_rank_t rank, ucc_rank_t root, + ucc_tl_cuda_shm_barrier_t *barrier) +{ + if (rank == root) { + barrier->size = size; + barrier->count = 0; + barrier->sense = 0; + } + barrier->state[rank] = UCC_OK; + barrier->local_sense[rank] = 1; + return UCC_OK; +} + ucc_status_t ucc_tl_cuda_shm_barrier_init(ucc_rank_t size, ucc_rank_t rank, ucc_tl_cuda_shm_barrier_t *barrier) { @@ -134,6 +151,8 @@ static inline int alg_id_from_str(ucc_coll_type_t coll_type, const char *str) return ucc_tl_cuda_allgather_alg_from_str(str); case UCC_COLL_TYPE_ALLGATHERV: return ucc_tl_cuda_allgatherv_alg_from_str(str); + case UCC_COLL_TYPE_BCAST: + return ucc_tl_cuda_bcast_alg_from_str(str); default: break; } @@ -187,6 +206,16 @@ ucc_status_t ucc_tl_cuda_alg_id_to_init(int alg_id, const char *alg_id_str, break; }; break; + case UCC_COLL_TYPE_BCAST: + switch (alg_id) { + case UCC_TL_CUDA_BCAST_ALG_LINEAR: + *init = ucc_tl_cuda_bcast_linear_init; + break; + default: + status = UCC_ERR_INVALID_PARAM; + break; + }; + break; case UCC_COLL_TYPE_REDUCE_SCATTER: switch (alg_id) { case UCC_TL_CUDA_REDUCE_SCATTER_ALG_AUTO: diff --git a/src/components/tl/cuda/tl_cuda_coll.h b/src/components/tl/cuda/tl_cuda_coll.h index 8b15cdf249..328db791ab 100644 --- a/src/components/tl/cuda/tl_cuda_coll.h +++ b/src/components/tl/cuda/tl_cuda_coll.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -10,7 +10,7 @@ #include "tl_cuda.h" #include "components/mc/ucc_mc.h" -#define UCC_TL_CUDA_N_DEFAULT_ALG_SELECT_STR 4 +#define UCC_TL_CUDA_N_DEFAULT_ALG_SELECT_STR 5 extern const char *ucc_tl_cuda_default_alg_select_str[UCC_TL_CUDA_N_DEFAULT_ALG_SELECT_STR]; @@ -50,6 +50,19 @@ static inline void ucc_tl_cuda_task_reset(ucc_tl_cuda_task_t *task) task->super.status = UCC_INPROGRESS; } +ucc_status_t ucc_tl_cuda_shm_barrier_init_root(ucc_rank_t size, ucc_rank_t rank, ucc_rank_t root, + ucc_tl_cuda_shm_barrier_t *barrier); + +ucc_status_t ucc_tl_cuda_shm_barrier_init(ucc_rank_t size, ucc_rank_t rank, + ucc_tl_cuda_shm_barrier_t *barrier); + +ucc_status_t ucc_tl_cuda_shm_barrier_start(ucc_rank_t rank, + ucc_tl_cuda_shm_barrier_t *barrier); + +ucc_status_t ucc_tl_cuda_shm_barrier_test(ucc_rank_t rank, + ucc_tl_cuda_shm_barrier_t *barrier); + + static inline ucc_tl_cuda_task_t *ucc_tl_cuda_task_get(ucc_tl_cuda_team_t *team) { ucc_tl_cuda_context_t *ctx = UCC_TL_CUDA_TEAM_CTX(team); @@ -100,19 +113,39 @@ ucc_status_t ucc_tl_cuda_task_init(ucc_base_coll_args_t *coll_args, return status; } - task->seq_num = team->seq_num++; - task->coll_id = task->seq_num % max_concurrent; + /* active set */ + if (UCC_COLL_ARGS_ACTIVE_SET(&coll_args->args)) { + ucc_assert(coll_args->args.coll_type == UCC_COLL_TYPE_BCAST); + task->subset.map = ucc_active_set_to_ep_map(&coll_args->args); + task->subset.myrank = UCC_TL_TEAM_RANK(team); + // root + if (task->subset.myrank == coll_args->args.root) { + int peer = ucc_ep_map_eval(task->subset.map, 1); + task->bcast_linear.key = ((uint64_t)coll_args->args.tag << 32 | + coll_args->args.root << 16 | peer); + } else { + task->bcast_linear.key = + ((uint64_t)coll_args->args.tag << 32 | + coll_args->args.root << 16 | task->subset.myrank); + } + task->seq_num = team->seq_num_active_set++; + } else { + task->seq_num = team->seq_num++; + task->coll_id = task->seq_num % max_concurrent; + task->bar = TASK_BAR(task); + } *task_h = task; return UCC_OK; } -static inline ucc_status_t ucc_tl_cuda_get_sync(ucc_tl_cuda_task_t *task) +// check if segment for current task is available and barrier is available (completed from prev iteration) +static inline ucc_status_t ucc_tl_cuda_get_sync_root(ucc_tl_cuda_task_t *task, ucc_rank_t root) { ucc_tl_cuda_team_t *team = TASK_TEAM(task); volatile ucc_tl_cuda_sync_state_t *state = &team->sync_state[task->coll_id]; - if ((UCC_TL_TEAM_RANK(team) == 0) && (*state == 0)) { + if ((UCC_TL_TEAM_RANK(team) == root) && (*state == 0)) { *state = task->seq_num; } if ((*state != task->seq_num) || @@ -122,6 +155,31 @@ static inline ucc_status_t ucc_tl_cuda_get_sync(ucc_tl_cuda_task_t *task) return UCC_OK; } +static inline void ucc_tl_cuda_put_sync_root(ucc_tl_cuda_task_t *task, ucc_rank_t root) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + ucc_tl_cuda_sync_state_t *state = &team->sync_state[task->coll_id]; + + if (UCC_TL_TEAM_RANK(team) == root) { + ucc_assert(*state == task->seq_num); + *state = 0; + } +} + +static inline ucc_status_t ucc_tl_cuda_get_sync(ucc_tl_cuda_task_t *task) +{ + ucc_tl_cuda_team_t *team = TASK_TEAM(task); + volatile ucc_tl_cuda_sync_state_t *state = &team->sync_state[task->coll_id]; + + if ((UCC_TL_TEAM_RANK(team) == 0) && (*state == 0)) { + *state = task->seq_num; + } + if ((*state != task->seq_num) || (task->bar->state[UCC_TL_TEAM_RANK(team)] != UCC_OK)) { + return UCC_INPROGRESS; + } + return UCC_OK; +} + static inline void ucc_tl_cuda_put_sync(ucc_tl_cuda_task_t *task) { ucc_tl_cuda_team_t *team = TASK_TEAM(task); @@ -142,18 +200,26 @@ ucc_status_t ucc_tl_cuda_coll_init(ucc_base_coll_args_t *coll_args, ucc_status_t ucc_tl_cuda_coll_finalize(ucc_coll_task_t *coll_task); -ucc_status_t ucc_tl_cuda_shm_barrier_init(ucc_rank_t size, ucc_rank_t rank, - ucc_tl_cuda_shm_barrier_t *barrier); - -ucc_status_t ucc_tl_cuda_shm_barrier_start(ucc_rank_t rank, - ucc_tl_cuda_shm_barrier_t *barrier); - -ucc_status_t ucc_tl_cuda_shm_barrier_test(ucc_rank_t rank, - ucc_tl_cuda_shm_barrier_t *barrier); - ucc_status_t ucc_tl_cuda_alg_id_to_init(int alg_id, const char *alg_id_str, ucc_coll_type_t coll_type, ucc_memory_type_t mem_type, ucc_base_coll_init_fn_t *init); +// common utils function for collectives: +static inline int get_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, + int step_id) +{ + ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); + + return sync->seq_num[step_id]; +} + +static inline void set_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, + int step, int step_id) +{ + ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); + + sync->seq_num[step_id] = step; +} + #endif diff --git a/src/components/tl/cuda/tl_cuda_ring.h b/src/components/tl/cuda/tl_cuda_ring.h index cc2d3c95db..13835df99d 100644 --- a/src/components/tl/cuda/tl_cuda_ring.h +++ b/src/components/tl/cuda/tl_cuda_ring.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -83,20 +83,4 @@ static inline ucc_rank_t get_recv_block(ucc_tl_cuda_team_t *team, return ring->ring[(ring->iring[trank] + tsize - step - 1) % tsize]; } -static inline int get_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int ring_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - return sync->seq_num[ring_id]; -} - -static inline void set_rank_step(ucc_tl_cuda_task_t *task, ucc_rank_t rank, - int step, int ring_id) -{ - ucc_tl_cuda_sync_t *sync = TASK_SYNC(task, rank); - - sync->seq_num[ring_id] = step; -} - #endif diff --git a/src/components/tl/cuda/tl_cuda_team.c b/src/components/tl/cuda/tl_cuda_team.c index 64123a8cea..3b8a5fb253 100644 --- a/src/components/tl/cuda/tl_cuda_team.c +++ b/src/components/tl/cuda/tl_cuda_team.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -22,6 +22,8 @@ UCC_CLASS_INIT_FUNC(ucc_tl_cuda_team_t, ucc_base_context_t *tl_context, ucc_derived_of(tl_context, ucc_tl_cuda_context_t); ucc_tl_cuda_lib_t *lib = ucc_derived_of(tl_context->lib, ucc_tl_cuda_lib_t); + // Number of preallocated resource groups for tasks, including the active set. + uint32_t resource_num = lib->cfg.max_concurrent * 2; ucc_tl_cuda_shm_barrier_t *bar; ucc_status_t status; int shm_id, i, j; @@ -45,7 +47,8 @@ UCC_CLASS_INIT_FUNC(ucc_tl_cuda_team_t, ucc_base_context_t *tl_context, return UCC_ERR_NO_MEMORY; } - scratch_size = lib->cfg.max_concurrent * lib->cfg.scratch_size; + // active set + scratch_size = resource_num * lib->cfg.scratch_size; status = CUDA_FUNC(cudaMalloc(&self->scratch.loc, scratch_size)); if (status != UCC_OK) { tl_error(tl_context->lib, "failed to alloc scratch buffer"); @@ -64,6 +67,7 @@ UCC_CLASS_INIT_FUNC(ucc_tl_cuda_team_t, ucc_base_context_t *tl_context, lib->cfg.max_concurrent + sizeof(ucc_tl_cuda_shm_barrier_t) * lib->cfg.max_concurrent + sizeof(ucc_tl_cuda_sync_state_t) * lib->cfg.max_concurrent; + ctrl_size *= 2; // active sets shm_id = -1; self->sync = (void*)-1; @@ -77,10 +81,12 @@ UCC_CLASS_INIT_FUNC(ucc_tl_cuda_team_t, ucc_base_context_t *tl_context, goto ids_exchange; } memset(self->sync, 0, ctrl_size); - self->bar = (ucc_tl_cuda_shm_barrier_t*)UCC_TL_CUDA_TEAM_SYNC(self, 0, - lib->cfg.max_concurrent); - for (i = 0; i < lib->cfg.max_concurrent; i++) { + self->bar = (ucc_tl_cuda_shm_barrier_t *)UCC_TL_CUDA_TEAM_SYNC( + self, 0, resource_num); + /* active set */ + for (i = 0; i < resource_num; i++) { bar = UCC_TL_CUDA_TEAM_BARRIER(self, i); + bar->tag = UCC_TL_CUDA_TAG_FREE; // mark as free for (j = 0; j < UCC_TL_TEAM_SIZE(self); j++) { status = ucc_tl_cuda_shm_barrier_init(UCC_TL_TEAM_SIZE(self), j, bar); @@ -109,6 +115,7 @@ UCC_CLASS_INIT_FUNC(ucc_tl_cuda_team_t, ucc_base_context_t *tl_context, tl_debug(tl_context->lib, "posted tl team: %p", self); self->seq_num = 1; + self->seq_num_active_set = 1; return UCC_OK; free_devices: @@ -127,6 +134,8 @@ UCC_CLASS_CLEANUP_FUNC(ucc_tl_cuda_team_t) { ucc_tl_cuda_lib_t *lib = ucc_derived_of(self->super.super.context->lib, ucc_tl_cuda_lib_t); + // Number of preallocated resource groups for tasks, including the active set. + uint32_t resource_num = lib->cfg.max_concurrent * 2; ucc_tl_cuda_sync_t *sync; cudaError_t st; int i, j; @@ -137,7 +146,7 @@ UCC_CLASS_CLEANUP_FUNC(ucc_tl_cuda_team_t) } if (self->ids) { if (self->sync != (void*)-1) { - for (i = 0; i < lib->cfg.max_concurrent; i++) { + for (i = 0; i < resource_num; i++) { for (j = 0; j < UCC_TL_TEAM_SIZE(self); j++) { if (j == UCC_TL_TEAM_RANK(self)) { continue; @@ -199,6 +208,8 @@ ucc_status_t ucc_tl_cuda_team_create_test(ucc_base_team_t *tl_team) ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t); ucc_tl_cuda_lib_t *lib = ucc_derived_of(tl_team->context->lib, ucc_tl_cuda_lib_t); + // Number of preallocated resource groups for tasks, including the active set. + uint32_t resource_num = lib->cfg.max_concurrent * 2; ucc_status_t status; ucc_tl_cuda_sync_t *sync; ucc_tl_cuda_shm_barrier_t *bar; @@ -268,14 +279,14 @@ ucc_status_t ucc_tl_cuda_team_create_test(ucc_base_team_t *tl_team) goto exit_err; } team->bar = (ucc_tl_cuda_shm_barrier_t*)UCC_TL_CUDA_TEAM_SYNC(team, 0, - lib->cfg.max_concurrent); + resource_num); } team->sync_state = (ucc_tl_cuda_sync_state_t*)PTR_OFFSET(team->bar, sizeof(ucc_tl_cuda_shm_barrier_t) * - lib->cfg.max_concurrent); + resource_num); CUDA_CHECK_GOTO(cudaStreamCreateWithFlags(&team->stream, cudaStreamNonBlocking), exit_err, status); - for (i = 0; i < lib->cfg.max_concurrent; i++) { + for (i = 0; i < resource_num; i++) { sync = UCC_TL_CUDA_TEAM_SYNC(team, UCC_TL_TEAM_RANK(team), i); CUDA_CHECK_GOTO(cudaEventCreateWithFlags(&sync->ipc_event_local, cudaEventDisableTiming | @@ -303,7 +314,7 @@ ucc_status_t ucc_tl_cuda_team_create_test(ucc_base_team_t *tl_team) goto exit_err; } - for (i = 0; i < lib->cfg.max_concurrent; i++) { + for (i = 0; i < resource_num; i++) { sync = UCC_TL_CUDA_TEAM_SYNC(team, UCC_TL_TEAM_RANK(team), i); for (j = 0 ; j < UCC_TL_TEAM_SIZE(team); j++) { if (j == UCC_TL_TEAM_RANK(team)) { diff --git a/src/components/tl/cuda/tl_cuda_team_topo.h b/src/components/tl/cuda/tl_cuda_team_topo.h index 96b6d63a5b..a56b28bf21 100644 --- a/src/components/tl/cuda/tl_cuda_team_topo.h +++ b/src/components/tl/cuda/tl_cuda_team_topo.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -51,7 +51,7 @@ ucc_tl_cuda_team_topo_is_direct(const ucc_tl_team_t *team, } static inline int -ucc_tl_cuda_team_topo_is_fully_conntected(const ucc_tl_cuda_team_topo_t *topo) +ucc_tl_cuda_team_topo_is_fully_connected(const ucc_tl_cuda_team_topo_t *topo) { return topo->is_fully_connected; } diff --git a/src/components/tl/ucp/bcast/bcast_knomial.c b/src/components/tl/ucp/bcast/bcast_knomial.c index 1ca08893e3..62430024bf 100644 --- a/src/components/tl/ucp/bcast/bcast_knomial.c +++ b/src/components/tl/ucp/bcast/bcast_knomial.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -22,7 +22,7 @@ void ucc_tl_ucp_bcast_knomial_progress(ucc_coll_task_t *coll_task) ucc_rank_t size = (ucc_rank_t)task->subset.map.ep_num; uint32_t radix = task->bcast_kn.radix; - ucc_rank_t root = (uint32_t)TASK_ARGS(task).root; + ucc_rank_t root = (ucc_rank_t)TASK_ARGS(task).root; ucc_rank_t dist = task->bcast_kn.dist; void *buffer = TASK_ARGS(task).src.info.buffer; ucc_memory_type_t mtype = TASK_ARGS(task).src.info.mem_type; diff --git a/test/gtest/coll/test_bcast.cc b/test/gtest/coll/test_bcast.cc index 6d80816a31..5ceca3545f 100644 --- a/test/gtest/coll/test_bcast.cc +++ b/test/gtest/coll/test_bcast.cc @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * See file LICENSE for terms. */ @@ -276,6 +276,8 @@ ucc_job_env_t two_step_env = {{"UCC_CL_HIER_TUNE", "bcast:@2step:0-inf:inf"}, {"UCC_CLS", "all"}}; ucc_job_env_t dbt_env = {{"UCC_TL_UCP_TUNE", "bcast:@dbt:0-inf:inf"}, {"UCC_CLS", "basic"}}; +ucc_job_env_t cuda_env = {{"UCC_TL_CUDA_TUNE", "bcast:cuda:@0"}, + {"UCC_CLS", "basic"}}; INSTANTIATE_TEST_CASE_P( , test_bcast_alg, ::testing::Combine( @@ -285,6 +287,10 @@ INSTANTIATE_TEST_CASE_P( #else ::testing::Values(UCC_MEMORY_TYPE_HOST), #endif +#ifdef HAVE_CUDA + ::testing::Values(two_step_env, dbt_env, cuda_env), //env +#else ::testing::Values(two_step_env, dbt_env), //env +#endif ::testing::Values(8, 65536), // count - ::testing::Values(15,16))); // n_procs + ::testing::Values(15, 16))); // n_procs