Skip to content

Commit

Permalink
Merge pull request #8014 from Artemy-Mellanox/topic/fuj_perf_old-1.12
Browse files Browse the repository at this point in the history
UCP/EP: Add eager multi-fragment overhead - v1.12.x
  • Loading branch information
yosefe authored Mar 8, 2022
2 parents b8dfe5b + af8c438 commit fdd015d
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 52 deletions.
160 changes: 116 additions & 44 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1696,10 +1696,11 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
return 1;
}

static void ucp_ep_config_calc_params(ucp_worker_h worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *lanes,
ucp_ep_thresh_params_t *params)
static ucs_status_t ucp_ep_config_calc_params(ucp_worker_h worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *lanes,
ucp_ep_thresh_params_t *params,
int eager)
{
ucp_context_h context = worker->context;
ucp_md_map_t md_map = 0;
Expand All @@ -1708,6 +1709,10 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker,
ucp_md_index_t md_index;
uct_md_attr_t *md_attr;
uct_iface_attr_t *iface_attr;
ucp_worker_iface_t *wiface;
uct_perf_attr_t perf_attr;
ucs_status_t status;
double bw;
int i;

memset(params, 0, sizeof(*params));
Expand All @@ -1734,30 +1739,61 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker,
}
}

params->bw += ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth);
bw = ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth);
if (eager && (iface_attr->cap.am.max_bcopy > 0)) {
/* Eager protocol has overhead for each fragment */
perf_attr.field_mask = UCT_PERF_ATTR_FIELD_OPERATION |
UCT_PERF_ATTR_FIELD_SEND_PRE_OVERHEAD |
UCT_PERF_ATTR_FIELD_SEND_POST_OVERHEAD;
perf_attr.operation = UCT_EP_OP_AM_ZCOPY;

wiface = ucp_worker_iface(worker, rsc_index);
status = uct_iface_estimate_perf(wiface->iface, &perf_attr);
if (status != UCS_OK) {
return status;
}

params->bw += 1.0 / ((1.0 / bw) + ((perf_attr.send_pre_overhead +
perf_attr.send_post_overhead) /
iface_attr->cap.am.max_bcopy));
} else {
params->bw += bw;
}
}

return UCS_OK;
}

static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *eager_lanes,
const ucp_lane_index_t *rndv_lanes,
int recv_reg_cost)
static ucs_status_t
ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *eager_lanes,
const ucp_lane_index_t *rndv_lanes,
int recv_reg_cost, size_t *thresh_p)
{
ucp_context_h context = worker->context;
double diff_percent = 1.0 - context->config.ext.rndv_perf_diff / 100.0;
ucp_ep_thresh_params_t eager_zcopy;
ucp_ep_thresh_params_t rndv;
double numerator, denumerator;
double numerator, denominator;
ucp_rsc_index_t eager_rsc_index;
uct_iface_attr_t *eager_iface_attr;
ucs_status_t status;
double rts_latency;

/* All formulas and descriptions are listed at
* https://github.com/openucx/ucx/wiki/Rendezvous-Protocol-threshold-for-multilane-mode */

ucp_ep_config_calc_params(worker, config, eager_lanes, &eager_zcopy);
ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv);
status = ucp_ep_config_calc_params(worker, config, eager_lanes,
&eager_zcopy, 1);
if (status != UCS_OK) {
return status;
}

status = ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv, 0);
if (status != UCS_OK) {
return status;
}

if ((eager_zcopy.bw == 0) || (rndv.bw == 0)) {
goto fallback;
Expand All @@ -1774,17 +1810,23 @@ static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
(2 * eager_zcopy.overhead) + rndv.overhead) -
eager_zcopy.reg_overhead - eager_zcopy.overhead;

denumerator = eager_zcopy.reg_growth +
denominator = eager_zcopy.reg_growth +
1.0 / ucs_min(eager_zcopy.bw, context->config.ext.bcopy_bw) -
diff_percent *
(rndv.reg_growth * (1 + recv_reg_cost) + 1.0 / rndv.bw);

if ((numerator > 0) && (denumerator > 0)) {
return ucs_max(numerator / denumerator, eager_iface_attr->cap.am.max_bcopy);
if ((numerator <= 0) || (denominator <= 0)) {
goto fallback;
}

*thresh_p = ucs_max(numerator / denominator,
eager_iface_attr->cap.am.max_bcopy);
return UCS_OK;

fallback:
return context->config.ext.rndv_thresh_fallback;
*thresh_p = context->config.ext.rndv_thresh_fallback;
return UCS_OK;

}

static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value,
Expand All @@ -1800,18 +1842,24 @@ static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value,
return thresh;
}

static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *rma_lanes)
static ucs_status_t
ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *rma_lanes,
ssize_t *thresh_p)
{
ucp_context_h context = worker->context;
double bcopy_bw = context->config.ext.bcopy_bw;
ucp_ep_thresh_params_t rma;
uct_md_attr_t *md_attr;
double numerator, denumerator;
double numerator, denominator;
double reg_overhead, reg_growth;
ucs_status_t status;

ucp_ep_config_calc_params(worker, config, rma_lanes, &rma);
status = ucp_ep_config_calc_params(worker, config, rma_lanes, &rma, 0);
if (status != UCS_OK) {
return status;
}

if (rma.bw == 0) {
goto fallback;
Expand All @@ -1827,14 +1875,18 @@ static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
}

numerator = reg_overhead;
denumerator = (1 / bcopy_bw) - reg_growth;
denominator = (1 / bcopy_bw) - reg_growth;

if (denumerator > 0) {
return numerator / denumerator;
if (denominator <= 0) {
goto fallback;
}

*thresh_p = numerator / denominator;
return UCS_OK;

fallback:
return SIZE_MAX;
*thresh_p = SIZE_MAX;
return UCS_OK;
}

static void ucp_ep_config_adjust_max_short(ssize_t *max_short,
Expand All @@ -1859,23 +1911,28 @@ static void ucp_ep_config_init_short_thresh(ucp_memtype_thresh_t *thresh)
thresh->memtype_off = -1;
}

static void ucp_ep_config_set_am_rndv_thresh(
static ucs_status_t ucp_ep_config_set_am_rndv_thresh(
ucp_worker_h worker, uct_iface_attr_t *iface_attr,
uct_md_attr_t *md_attr, ucp_ep_config_t *config, size_t min_rndv_thresh,
size_t max_rndv_thresh, ucp_rndv_thresh_t *thresh)
{
ucp_context_h context = worker->context;
size_t rndv_thresh, rndv_local_thresh, min_thresh;
ucs_status_t status;

ucs_assert(config->key.am_lane != UCP_NULL_LANE);
ucs_assert(config->key.lanes[config->key.am_lane].rsc_index != UCP_NULL_RESOURCE);

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the AM rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
config->key.am_bw_lanes,
0);
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
config->key.am_bw_lanes,
0, &rndv_thresh);
if (status != UCS_OK) {
return status;
}

rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
ucs_trace("active message rendezvous threshold is %zu", rndv_thresh);
} else {
Expand All @@ -1889,6 +1946,8 @@ static void ucp_ep_config_set_am_rndv_thresh(

ucs_trace("Active Message rndv threshold is %zu (fast local compl: %zu)",
thresh->remote, thresh->local);

return UCS_OK;
}

static void
Expand All @@ -1901,6 +1960,7 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,
ucp_rsc_index_t rsc_index;
size_t rndv_thresh, rndv_local_thresh, min_thresh;
uct_iface_attr_t *iface_attr;
ucs_status_t status;

if (lane == UCP_NULL_LANE) {
goto out_not_supported;
Expand All @@ -1915,9 +1975,13 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
lanes, 1);
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
lanes, 1, &rndv_thresh);
if (status != UCS_OK) {
goto out_not_supported;
}

rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
} else {
rndv_thresh = context->config.ext.rndv_thresh;
Expand Down Expand Up @@ -2349,10 +2413,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
&config->tag.rndv.rma_thresh);

md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;
ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr,
config, min_am_rndv_thresh,
max_am_rndv_thresh,
&config->tag.rndv.am_thresh);
status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr,
md_attr, config, min_am_rndv_thresh,
max_am_rndv_thresh, &config->tag.rndv.am_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}
}

config->tag.eager.max_short = ucp_ep_config_max_short(
Expand Down Expand Up @@ -2408,10 +2474,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
&config->rndv.rma_thresh);
}

ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr,
config,
iface_attr->cap.am.min_zcopy,
SIZE_MAX, &config->rndv.am_thresh);
status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr,
md_attr, config, iface_attr->cap.am.min_zcopy, SIZE_MAX,
&config->rndv.am_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}

am_max_eager_short = ucp_ep_config_max_short(
worker->context, iface_attr, UCT_IFACE_FLAG_AM_SHORT,
Expand Down Expand Up @@ -2466,8 +2534,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

memset(&config->rma, 0, sizeof(config->rma));

rma_zcopy_thresh = ucp_ep_config_calc_rma_zcopy_thresh(worker, config,
config->key.rma_lanes);
status = ucp_ep_config_calc_rma_zcopy_thresh(worker, config,
config->key.rma_lanes,
&rma_zcopy_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
Expand Down
13 changes: 5 additions & 8 deletions test/gtest/ucp/test_ucp_peer_failure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,17 @@ void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count,
EXPECT_NE(UCS_OK, m_err_status);

if (UCS_PTR_IS_PTR(sreq)) {
/* The request may either succeed or fail, even though the data is
* not * delivered - depends on when the error is detected on sender
* side and if zcopy/bcopy protocol is used. In any case, the
* request must complete, and all resources have to be released.
*/
ucs_status_t status = ucp_request_check_status(sreq);
EXPECT_NE(UCS_INPROGRESS, status);
ucs_status_t status;
/* If rendezvous protocol is used, the m_err_count is increased
* on the receiver side, so the send request may not complete
* immediately */
status = request_wait(sreq);
if (request_must_fail) {
EXPECT_TRUE((m_err_status == status) ||
(UCS_ERR_CANCELED == status));
} else {
EXPECT_TRUE((m_err_status == status) || (UCS_OK == status));
}
ucp_request_release(sreq);
}

/* Additional sends must fail */
Expand Down

0 comments on commit fdd015d

Please sign in to comment.