Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/EP: Add eager multi-fragment overhead - v1.12.x #8014

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 116 additions & 44 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1696,10 +1696,11 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
return 1;
}

static void ucp_ep_config_calc_params(ucp_worker_h worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *lanes,
ucp_ep_thresh_params_t *params)
static ucs_status_t ucp_ep_config_calc_params(ucp_worker_h worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *lanes,
ucp_ep_thresh_params_t *params,
int eager)
{
ucp_context_h context = worker->context;
ucp_md_map_t md_map = 0;
Expand All @@ -1708,6 +1709,10 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker,
ucp_md_index_t md_index;
uct_md_attr_t *md_attr;
uct_iface_attr_t *iface_attr;
ucp_worker_iface_t *wiface;
uct_perf_attr_t perf_attr;
ucs_status_t status;
double bw;
int i;

memset(params, 0, sizeof(*params));
Expand All @@ -1734,30 +1739,61 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker,
}
}

params->bw += ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth);
bw = ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth);
if (eager && (iface_attr->cap.am.max_bcopy > 0)) {
/* Eager protocol has overhead for each fragment */
perf_attr.field_mask = UCT_PERF_ATTR_FIELD_OPERATION |
UCT_PERF_ATTR_FIELD_SEND_PRE_OVERHEAD |
UCT_PERF_ATTR_FIELD_SEND_POST_OVERHEAD;
perf_attr.operation = UCT_EP_OP_AM_ZCOPY;

wiface = ucp_worker_iface(worker, rsc_index);
status = uct_iface_estimate_perf(wiface->iface, &perf_attr);
if (status != UCS_OK) {
return status;
}

params->bw += 1.0 / ((1.0 / bw) + ((perf_attr.send_pre_overhead +
perf_attr.send_post_overhead) /
iface_attr->cap.am.max_bcopy));
} else {
params->bw += bw;
}
}

return UCS_OK;
}

static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *eager_lanes,
const ucp_lane_index_t *rndv_lanes,
int recv_reg_cost)
static ucs_status_t
ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *eager_lanes,
const ucp_lane_index_t *rndv_lanes,
int recv_reg_cost, size_t *thresh_p)
{
ucp_context_h context = worker->context;
double diff_percent = 1.0 - context->config.ext.rndv_perf_diff / 100.0;
ucp_ep_thresh_params_t eager_zcopy;
ucp_ep_thresh_params_t rndv;
double numerator, denumerator;
double numerator, denominator;
ucp_rsc_index_t eager_rsc_index;
uct_iface_attr_t *eager_iface_attr;
ucs_status_t status;
double rts_latency;

/* All formulas and descriptions are listed at
* https://github.com/openucx/ucx/wiki/Rendezvous-Protocol-threshold-for-multilane-mode */

ucp_ep_config_calc_params(worker, config, eager_lanes, &eager_zcopy);
ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv);
status = ucp_ep_config_calc_params(worker, config, eager_lanes,
&eager_zcopy, 1);
if (status != UCS_OK) {
return status;
}

status = ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv, 0);
if (status != UCS_OK) {
return status;
}

if ((eager_zcopy.bw == 0) || (rndv.bw == 0)) {
goto fallback;
Expand All @@ -1774,17 +1810,23 @@ static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
(2 * eager_zcopy.overhead) + rndv.overhead) -
eager_zcopy.reg_overhead - eager_zcopy.overhead;

denumerator = eager_zcopy.reg_growth +
denominator = eager_zcopy.reg_growth +
1.0 / ucs_min(eager_zcopy.bw, context->config.ext.bcopy_bw) -
diff_percent *
(rndv.reg_growth * (1 + recv_reg_cost) + 1.0 / rndv.bw);

if ((numerator > 0) && (denumerator > 0)) {
return ucs_max(numerator / denumerator, eager_iface_attr->cap.am.max_bcopy);
if ((numerator <= 0) || (denominator <= 0)) {
goto fallback;
}

*thresh_p = ucs_max(numerator / denominator,
eager_iface_attr->cap.am.max_bcopy);
return UCS_OK;

fallback:
return context->config.ext.rndv_thresh_fallback;
*thresh_p = context->config.ext.rndv_thresh_fallback;
return UCS_OK;

}

static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value,
Expand All @@ -1800,18 +1842,24 @@ static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value,
return thresh;
}

static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *rma_lanes)
static ucs_status_t
ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *rma_lanes,
ssize_t *thresh_p)
{
ucp_context_h context = worker->context;
double bcopy_bw = context->config.ext.bcopy_bw;
ucp_ep_thresh_params_t rma;
uct_md_attr_t *md_attr;
double numerator, denumerator;
double numerator, denominator;
double reg_overhead, reg_growth;
ucs_status_t status;

ucp_ep_config_calc_params(worker, config, rma_lanes, &rma);
status = ucp_ep_config_calc_params(worker, config, rma_lanes, &rma, 0);
if (status != UCS_OK) {
return status;
}

if (rma.bw == 0) {
goto fallback;
Expand All @@ -1827,14 +1875,18 @@ static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker,
}

numerator = reg_overhead;
denumerator = (1 / bcopy_bw) - reg_growth;
denominator = (1 / bcopy_bw) - reg_growth;

if (denumerator > 0) {
return numerator / denumerator;
if (denominator <= 0) {
goto fallback;
}

*thresh_p = numerator / denominator;
return UCS_OK;

fallback:
return SIZE_MAX;
*thresh_p = SIZE_MAX;
return UCS_OK;
}

static void ucp_ep_config_adjust_max_short(ssize_t *max_short,
Expand All @@ -1859,23 +1911,28 @@ static void ucp_ep_config_init_short_thresh(ucp_memtype_thresh_t *thresh)
thresh->memtype_off = -1;
}

static void ucp_ep_config_set_am_rndv_thresh(
static ucs_status_t ucp_ep_config_set_am_rndv_thresh(
ucp_worker_h worker, uct_iface_attr_t *iface_attr,
uct_md_attr_t *md_attr, ucp_ep_config_t *config, size_t min_rndv_thresh,
size_t max_rndv_thresh, ucp_rndv_thresh_t *thresh)
{
ucp_context_h context = worker->context;
size_t rndv_thresh, rndv_local_thresh, min_thresh;
ucs_status_t status;

ucs_assert(config->key.am_lane != UCP_NULL_LANE);
ucs_assert(config->key.lanes[config->key.am_lane].rsc_index != UCP_NULL_RESOURCE);

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the AM rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
config->key.am_bw_lanes,
0);
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
config->key.am_bw_lanes,
0, &rndv_thresh);
if (status != UCS_OK) {
return status;
}

rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
ucs_trace("active message rendezvous threshold is %zu", rndv_thresh);
} else {
Expand All @@ -1889,6 +1946,8 @@ static void ucp_ep_config_set_am_rndv_thresh(

ucs_trace("Active Message rndv threshold is %zu (fast local compl: %zu)",
thresh->remote, thresh->local);

return UCS_OK;
}

static void
Expand All @@ -1901,6 +1960,7 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,
ucp_rsc_index_t rsc_index;
size_t rndv_thresh, rndv_local_thresh, min_thresh;
uct_iface_attr_t *iface_attr;
ucs_status_t status;

if (lane == UCP_NULL_LANE) {
goto out_not_supported;
Expand All @@ -1915,9 +1975,13 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config,

if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
lanes, 1);
status = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
lanes, 1, &rndv_thresh);
if (status != UCS_OK) {
goto out_not_supported;
}

rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh;
} else {
rndv_thresh = context->config.ext.rndv_thresh;
Expand Down Expand Up @@ -2349,10 +2413,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
&config->tag.rndv.rma_thresh);

md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;
ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr,
config, min_am_rndv_thresh,
max_am_rndv_thresh,
&config->tag.rndv.am_thresh);
status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr,
md_attr, config, min_am_rndv_thresh,
max_am_rndv_thresh, &config->tag.rndv.am_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}
}

config->tag.eager.max_short = ucp_ep_config_max_short(
Expand Down Expand Up @@ -2408,10 +2474,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
&config->rndv.rma_thresh);
}

ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr,
config,
iface_attr->cap.am.min_zcopy,
SIZE_MAX, &config->rndv.am_thresh);
status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr,
md_attr, config, iface_attr->cap.am.min_zcopy, SIZE_MAX,
&config->rndv.am_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}

am_max_eager_short = ucp_ep_config_max_short(
worker->context, iface_attr, UCT_IFACE_FLAG_AM_SHORT,
Expand Down Expand Up @@ -2466,8 +2534,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

memset(&config->rma, 0, sizeof(config->rma));

rma_zcopy_thresh = ucp_ep_config_calc_rma_zcopy_thresh(worker, config,
config->key.rma_lanes);
status = ucp_ep_config_calc_rma_zcopy_thresh(worker, config,
config->key.rma_lanes,
&rma_zcopy_thresh);
if (status != UCS_OK) {
goto err_free_dst_mds;
}

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
Expand Down
13 changes: 5 additions & 8 deletions test/gtest/ucp/test_ucp_peer_failure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,17 @@ void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count,
EXPECT_NE(UCS_OK, m_err_status);

if (UCS_PTR_IS_PTR(sreq)) {
/* The request may either succeed or fail, even though the data is
* not * delivered - depends on when the error is detected on sender
* side and if zcopy/bcopy protocol is used. In any case, the
* request must complete, and all resources have to be released.
*/
ucs_status_t status = ucp_request_check_status(sreq);
EXPECT_NE(UCS_INPROGRESS, status);
ucs_status_t status;
/* If rendezvous protocol is used, the m_err_count is increased
* on the receiver side, so the send request may not complete
* immediately */
status = request_wait(sreq);
if (request_must_fail) {
EXPECT_TRUE((m_err_status == status) ||
(UCS_ERR_CANCELED == status));
} else {
EXPECT_TRUE((m_err_status == status) || (UCS_OK == status));
}
ucp_request_release(sreq);
}

/* Additional sends must fail */
Expand Down