Skip to content

Commit

Permalink
add loopbacks for allgather
Browse files Browse the repository at this point in the history
  • Loading branch information
yaeliyac committed Dec 13, 2024
1 parent 73651ea commit eecdebf
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/components/ec/cuda/ec_cuda_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ ucc_status_t ucc_cuda_executor_finalize(ucc_ee_executor_t *executor)
ucc_ec_cuda_executor_t *eee = ucc_derived_of(executor,
ucc_ec_cuda_executor_t);

ec_debug(&ucc_ec_cuda.super, "executor free, eee: %p", eee);
//ec_debug(&ucc_ec_cuda.super, "executor free, eee: %p", eee);
ucc_assert(eee->state == UCC_EC_CUDA_EXECUTOR_INITIALIZED);
ucc_mpool_put(eee);

Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/allgather/allgather.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#define ALLGATHER_H_
#include "../tl_ucp.h"
#include "../tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"


enum {
UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL,
Expand Down Expand Up @@ -36,6 +38,7 @@ static inline int ucc_tl_ucp_allgather_alg_from_str(const char *str)
return i;
}


ucc_status_t ucc_tl_ucp_allgather_init(ucc_tl_ucp_task_t *task);

/* Ring */
Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/ucp/allgather/allgather_bruck.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,4 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task)
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
}
64 changes: 45 additions & 19 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "coll_patterns/sra_knomial.h"
#include "utils/ucc_math.h"
#include "utils/ucc_coll_utils.h"
#include "allgather.h"



#define SAVE_STATE(_phase) \
do { \
Expand Down Expand Up @@ -52,6 +55,7 @@
* As such allgather must keep to this ranking to be aligned with scatter.
*/


void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
Expand All @@ -78,9 +82,17 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_status_t status;
size_t extra_count;

EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test",
task->allgather_kn.etask);
int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!use_cuda) {
if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){
return;
}
}

if (use_cuda) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask);
task->allgather_kn.etask = NULL;

UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
peer = ucc_knomial_pattern_get_proxy(p, rank);
Expand Down Expand Up @@ -197,6 +209,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)

ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
{

ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
Expand All @@ -214,6 +227,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_ee_executor_t *exec;
void *rbuf;

int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_kn.etask = NULL;
Expand All @@ -225,21 +240,29 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_dt_size(args->dst.info.datatype);
rbuf = args->dst.info.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
if (use_cuda) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
} else {
/*Loopback*/
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->src.info.mem_type, rank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->dst.info.mem_type, rank, team, task),task, out);
}
}
} else if (ct == UCC_COLL_TYPE_ALLGATHERV) {
Expand Down Expand Up @@ -284,6 +307,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
task->allgather_kn.sbuf = PTR_OFFSET(rbuf, offset);

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
Expand All @@ -293,8 +318,9 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task;
ucc_sbgp_t *sbgp;

task = ucc_tl_ucp_init_task(coll_args, team);
//ucc_coll_args_t *args = &coll_args->args;

if (tl_team->cfg.use_reordering &&
coll_args->args.coll_type == UCC_COLL_TYPE_ALLREDUCE) {
sbgp = ucc_topo_get_sbgp(tl_team->topo, UCC_SBGP_FULL_HOST_ORDERED);
Expand All @@ -306,6 +332,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
task->super.post = ucc_tl_ucp_allgather_knomial_start;
task->super.progress = ucc_tl_ucp_allgather_knomial_progress;
*task_h = &task->super;

return UCC_OK;
}

Expand All @@ -316,7 +343,6 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_rank_t size = UCC_TL_TEAM_SIZE(tl_team);
ucc_kn_radix_t radix;

radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allgather_kn_radix, size);
return ucc_tl_ucp_allgather_knomial_init_r(coll_args, team, task_h, radix);
}
18 changes: 14 additions & 4 deletions src/components/tl/ucp/allgather/allgather_neighbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,26 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task)
ucc_rank_t neighbor;
void *tmprecv, *tmpsend;


UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start",
0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if (use_cuda) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, err);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, err);
}
}

if (trank % 2) {
neighbor = (trank - 1 + tsize) % tsize;
} else {
Expand All @@ -175,4 +183,6 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task)
task, out);
out:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
err:
return task->super.status;
}
37 changes: 28 additions & 9 deletions src/components/tl/ucp/allgather/allgather_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,32 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task)
if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) {
return;
}

sendto = ucc_ep_map_eval(task->subset.map, (trank + 1) % tsize);
recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize);


uint32_t use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;
int iter = use_cuda ? tsize - 1 : tsize;

while (task->tagged.send_posted < iter) {

step = use_cuda ? task->tagged.send_posted : task->tagged.send_posted - 1;

while (task->tagged.send_posted < tsize - 1) {
step = task->tagged.send_posted;
sblock = task->allgather_ring.get_send_block(&task->subset, trank,
tsize, step);
rblock = task->allgather_ring.get_recv_block(&task->subset, trank,
tsize, step);
buf = PTR_OFFSET(rbuf, sblock * data_size);

UCPCHECK_GOTO(
ucc_tl_ucp_send_nb(buf, data_size, rmem, sendto, team, task),
task, out);
buf = PTR_OFFSET(rbuf, rblock * data_size);
UCPCHECK_GOTO(
ucc_tl_ucp_recv_nb(buf, data_size, rmem, recvfrom, team, task),
task, out);

if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) {
return;
}
Expand Down Expand Up @@ -93,17 +102,26 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task)
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_ring_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize,
0);
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0);
if (use_cuda) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
ucc_rank_t rank = ucc_ep_map_eval(task->subset.map, trank);
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, rank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, rank, team, task),task, out);
}
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task)
Expand All @@ -119,6 +137,7 @@ ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task)
if (!(task->flags & UCC_TL_UCP_TASK_FLAG_SUBSET)) {
if (team->cfg.use_reordering) {
sbgp = ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL_HOST_ORDERED);
//printf("reordering: rank %d chenged to %d\n", (int)task->subset.myrank, (int)sbgp->group_rank);
task->subset.myrank = sbgp->group_rank;
task->subset.map = sbgp->map;
}
Expand Down Expand Up @@ -147,4 +166,4 @@ ucc_status_t ucc_tl_ucp_allgather_ring_init(ucc_base_coll_args_t *coll_args,
}
*task_h = &task->super;
return UCC_OK;
}
}
17 changes: 13 additions & 4 deletions src/components/tl/ucp/allgather/allgather_sparbit.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,22 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task)
task->allgather_sparbit.i = 0; // setup iteration
task->allgather_sparbit.data_expected = 1;

int use_cuda = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if (use_cuda) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, out);
}
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}
4 changes: 4 additions & 0 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_kn_radix),
UCC_CONFIG_TYPE_UINT},

{"ALLGATHER_USE_CUDA", "1", "Flag weather to use mc cuda copy",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_use_cuda),
UCC_CONFIG_TYPE_UINT},

{"BCAST_KN_RADIX", "4", "Radix of the recursive-knomial bcast algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_kn_radix),
UCC_CONFIG_TYPE_UINT},
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern ucc_tl_ucp_iface_t ucc_tl_ucp;
typedef struct ucc_tl_ucp_lib_config {
ucc_tl_lib_config_t super;
uint32_t kn_radix;
uint32_t allgather_use_cuda;
uint32_t fanin_kn_radix;
uint32_t fanout_kn_radix;
uint32_t barrier_kn_radix;
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ typedef struct ucc_tl_ucp_task {
} alltoall_bruck;
char plugin_data[UCC_TL_UCP_TASK_PLUGIN_MAX_DATA];
};
//int count;
} ucc_tl_ucp_task_t;

typedef struct ucc_tl_ucp_schedule {
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/ucp/tl_ucp_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *params,
if (UCC_OK != status) {
return status;
}
//printf("\n ------- use_cuda tl_ucp_lib.c:29 = %u ----------\n", self->cfg.allgather_use_cuda);

if (tl_ucp_config->kn_radix > 0) {
self->cfg.barrier_kn_radix = tl_ucp_config->kn_radix;
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp_service_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static ucc_rank_t ucc_tl_ucp_service_ring_get_recv_block(ucc_subset_t *subset,

static ucc_status_t ucc_tl_ucp_service_coll_start_executor(ucc_coll_task_t *task)
{
ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:34 start executer ====================>");
ucc_ee_executor_params_t eparams;
ucc_status_t status;

Expand Down Expand Up @@ -143,6 +144,7 @@ ucc_status_t ucc_tl_ucp_service_allgather(ucc_base_team_t *team, void *sbuf,
ucc_subset_t subset,
ucc_coll_task_t **task_p)
{
ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:146 ====================>");
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t *task = ucc_tl_ucp_get_task(tl_team);
uint32_t npolls =
Expand All @@ -167,6 +169,7 @@ ucc_status_t ucc_tl_ucp_service_allgather(ucc_base_team_t *team, void *sbuf,
ucc_status_t status;

status = ucc_coll_task_init(&task->super, &bargs, team);
ucc_debug("<==================== install/ucc/src/components/tl/ucp/tl_ucp_service_coll.c:171 ====================>");
if (status != UCC_OK) {
goto free_task;
}
Expand Down
2 changes: 2 additions & 0 deletions src/ucc/api/ucc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,8 @@ typedef struct ucc_coll_args {
int64_t stride;
uint64_t size;
} active_set;


} ucc_coll_args_t;

/**
Expand Down

0 comments on commit eecdebf

Please sign in to comment.