From af8c4381985e07c0f3d572e9604a7daf84a9674d Mon Sep 17 00:00:00 2001 From: Artemy Kovalyov Date: Sat, 19 Feb 2022 22:29:30 +0200 Subject: [PATCH] UCP/EP: Add eager multi-fragment overhead --- src/ucp/core/ucp_ep.c | 160 +++++++++++++++++------- test/gtest/ucp/test_ucp_peer_failure.cc | 13 +- 2 files changed, 121 insertions(+), 52 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 1462a383856..cf07dd05666 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1696,10 +1696,11 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, return 1; } -static void ucp_ep_config_calc_params(ucp_worker_h worker, - const ucp_ep_config_t *config, - const ucp_lane_index_t *lanes, - ucp_ep_thresh_params_t *params) +static ucs_status_t ucp_ep_config_calc_params(ucp_worker_h worker, + const ucp_ep_config_t *config, + const ucp_lane_index_t *lanes, + ucp_ep_thresh_params_t *params, + int eager) { ucp_context_h context = worker->context; ucp_md_map_t md_map = 0; @@ -1708,6 +1709,10 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker, ucp_md_index_t md_index; uct_md_attr_t *md_attr; uct_iface_attr_t *iface_attr; + ucp_worker_iface_t *wiface; + uct_perf_attr_t perf_attr; + ucs_status_t status; + double bw; int i; memset(params, 0, sizeof(*params)); @@ -1734,30 +1739,61 @@ static void ucp_ep_config_calc_params(ucp_worker_h worker, } } - params->bw += ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth); + bw = ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth); + if (eager && (iface_attr->cap.am.max_bcopy > 0)) { + /* Eager protocol has overhead for each fragment */ + perf_attr.field_mask = UCT_PERF_ATTR_FIELD_OPERATION | + UCT_PERF_ATTR_FIELD_SEND_PRE_OVERHEAD | + UCT_PERF_ATTR_FIELD_SEND_POST_OVERHEAD; + perf_attr.operation = UCT_EP_OP_AM_ZCOPY; + + wiface = ucp_worker_iface(worker, rsc_index); + status = uct_iface_estimate_perf(wiface->iface, &perf_attr); + if (status != UCS_OK) { + return status; + } + + params->bw += 1.0 / ((1.0 / bw) + ((perf_attr.send_pre_overhead + + perf_attr.send_post_overhead) / + iface_attr->cap.am.max_bcopy)); + } else { + params->bw += bw; + } } + + return UCS_OK; } -static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker, - const ucp_ep_config_t *config, - const ucp_lane_index_t *eager_lanes, - const ucp_lane_index_t *rndv_lanes, - int recv_reg_cost) +static ucs_status_t +ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker, + const ucp_ep_config_t *config, + const ucp_lane_index_t *eager_lanes, + const ucp_lane_index_t *rndv_lanes, + int recv_reg_cost, size_t *thresh_p) { ucp_context_h context = worker->context; double diff_percent = 1.0 - context->config.ext.rndv_perf_diff / 100.0; ucp_ep_thresh_params_t eager_zcopy; ucp_ep_thresh_params_t rndv; - double numerator, denumerator; + double numerator, denominator; ucp_rsc_index_t eager_rsc_index; uct_iface_attr_t *eager_iface_attr; + ucs_status_t status; double rts_latency; /* All formulas and descriptions are listed at * https://github.com/openucx/ucx/wiki/Rendezvous-Protocol-threshold-for-multilane-mode */ - ucp_ep_config_calc_params(worker, config, eager_lanes, &eager_zcopy); - ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv); + status = ucp_ep_config_calc_params(worker, config, eager_lanes, + &eager_zcopy, 1); + if (status != UCS_OK) { + return status; + } + + status = ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv, 0); + if (status != UCS_OK) { + return status; + } if ((eager_zcopy.bw == 0) || (rndv.bw == 0)) { goto fallback; @@ -1774,17 +1810,23 @@ static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker, (2 * eager_zcopy.overhead) + rndv.overhead) - eager_zcopy.reg_overhead - eager_zcopy.overhead; - denumerator = eager_zcopy.reg_growth + + denominator = eager_zcopy.reg_growth + 1.0 / ucs_min(eager_zcopy.bw, context->config.ext.bcopy_bw) - diff_percent * (rndv.reg_growth * (1 + recv_reg_cost) + 1.0 / rndv.bw); - if ((numerator > 0) && (denumerator > 0)) { - return ucs_max(numerator / denumerator, eager_iface_attr->cap.am.max_bcopy); + if ((numerator <= 0) || (denominator <= 0)) { + goto fallback; } + *thresh_p = ucs_max(numerator / denominator, + eager_iface_attr->cap.am.max_bcopy); + return UCS_OK; + fallback: - return context->config.ext.rndv_thresh_fallback; + *thresh_p = context->config.ext.rndv_thresh_fallback; + return UCS_OK; + } static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value, @@ -1800,18 +1842,24 @@ static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value, return thresh; } -static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker, - const ucp_ep_config_t *config, - const ucp_lane_index_t *rma_lanes) +static ucs_status_t +ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker, + const ucp_ep_config_t *config, + const ucp_lane_index_t *rma_lanes, + ssize_t *thresh_p) { ucp_context_h context = worker->context; double bcopy_bw = context->config.ext.bcopy_bw; ucp_ep_thresh_params_t rma; uct_md_attr_t *md_attr; - double numerator, denumerator; + double numerator, denominator; double reg_overhead, reg_growth; + ucs_status_t status; - ucp_ep_config_calc_params(worker, config, rma_lanes, &rma); + status = ucp_ep_config_calc_params(worker, config, rma_lanes, &rma, 0); + if (status != UCS_OK) { + return status; + } if (rma.bw == 0) { goto fallback; @@ -1827,14 +1875,18 @@ static size_t ucp_ep_config_calc_rma_zcopy_thresh(ucp_worker_t *worker, } numerator = reg_overhead; - denumerator = (1 / bcopy_bw) - reg_growth; + denominator = (1 / bcopy_bw) - reg_growth; - if (denumerator > 0) { - return numerator / denumerator; + if (denominator <= 0) { + goto fallback; } + *thresh_p = numerator / denominator; + return UCS_OK; + fallback: - return SIZE_MAX; + *thresh_p = SIZE_MAX; + return UCS_OK; } static void ucp_ep_config_adjust_max_short(ssize_t *max_short, @@ -1859,23 +1911,28 @@ static void ucp_ep_config_init_short_thresh(ucp_memtype_thresh_t *thresh) thresh->memtype_off = -1; } -static void ucp_ep_config_set_am_rndv_thresh( +static ucs_status_t ucp_ep_config_set_am_rndv_thresh( ucp_worker_h worker, uct_iface_attr_t *iface_attr, uct_md_attr_t *md_attr, ucp_ep_config_t *config, size_t min_rndv_thresh, size_t max_rndv_thresh, ucp_rndv_thresh_t *thresh) { ucp_context_h context = worker->context; size_t rndv_thresh, rndv_local_thresh, min_thresh; + ucs_status_t status; ucs_assert(config->key.am_lane != UCP_NULL_LANE); ucs_assert(config->key.lanes[config->key.am_lane].rsc_index != UCP_NULL_RESOURCE); if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) { /* auto - Make UCX calculate the AM rndv threshold on its own.*/ - rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config, - config->key.am_bw_lanes, - config->key.am_bw_lanes, - 0); + status = ucp_ep_config_calc_rndv_thresh(worker, config, + config->key.am_bw_lanes, + config->key.am_bw_lanes, + 0, &rndv_thresh); + if (status != UCS_OK) { + return status; + } + rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh; ucs_trace("active message rendezvous threshold is %zu", rndv_thresh); } else { @@ -1889,6 +1946,8 @@ static void ucp_ep_config_set_am_rndv_thresh( ucs_trace("Active Message rndv threshold is %zu (fast local compl: %zu)", thresh->remote, thresh->local); + + return UCS_OK; } static void @@ -1901,6 +1960,7 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config, ucp_rsc_index_t rsc_index; size_t rndv_thresh, rndv_local_thresh, min_thresh; uct_iface_attr_t *iface_attr; + ucs_status_t status; if (lane == UCP_NULL_LANE) { goto out_not_supported; @@ -1915,9 +1975,13 @@ ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker, ucp_ep_config_t *config, if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) { /* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/ - rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config, - config->key.am_bw_lanes, - lanes, 1); + status = ucp_ep_config_calc_rndv_thresh(worker, config, + config->key.am_bw_lanes, + lanes, 1, &rndv_thresh); + if (status != UCS_OK) { + goto out_not_supported; + } + rndv_local_thresh = context->config.ext.rndv_send_nbr_thresh; } else { rndv_thresh = context->config.ext.rndv_thresh; @@ -2349,10 +2413,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, &config->tag.rndv.rma_thresh); md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr; - ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr, - config, min_am_rndv_thresh, - max_am_rndv_thresh, - &config->tag.rndv.am_thresh); + status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, + md_attr, config, min_am_rndv_thresh, + max_am_rndv_thresh, &config->tag.rndv.am_thresh); + if (status != UCS_OK) { + goto err_free_dst_mds; + } } config->tag.eager.max_short = ucp_ep_config_max_short( @@ -2408,10 +2474,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, &config->rndv.rma_thresh); } - ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr, - config, - iface_attr->cap.am.min_zcopy, - SIZE_MAX, &config->rndv.am_thresh); + status = ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, + md_attr, config, iface_attr->cap.am.min_zcopy, SIZE_MAX, + &config->rndv.am_thresh); + if (status != UCS_OK) { + goto err_free_dst_mds; + } am_max_eager_short = ucp_ep_config_max_short( worker->context, iface_attr, UCT_IFACE_FLAG_AM_SHORT, @@ -2466,8 +2534,12 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, memset(&config->rma, 0, sizeof(config->rma)); - rma_zcopy_thresh = ucp_ep_config_calc_rma_zcopy_thresh(worker, config, - config->key.rma_lanes); + status = ucp_ep_config_calc_rma_zcopy_thresh(worker, config, + config->key.rma_lanes, + &rma_zcopy_thresh); + if (status != UCS_OK) { + goto err_free_dst_mds; + } /* Configuration for remote memory access */ for (lane = 0; lane < config->key.num_lanes; ++lane) { diff --git a/test/gtest/ucp/test_ucp_peer_failure.cc b/test/gtest/ucp/test_ucp_peer_failure.cc index 3ff06ca9c2f..fca05186988 100644 --- a/test/gtest/ucp/test_ucp_peer_failure.cc +++ b/test/gtest/ucp/test_ucp_peer_failure.cc @@ -360,20 +360,17 @@ void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count, EXPECT_NE(UCS_OK, m_err_status); if (UCS_PTR_IS_PTR(sreq)) { - /* The request may either succeed or fail, even though the data is - * not * delivered - depends on when the error is detected on sender - * side and if zcopy/bcopy protocol is used. In any case, the - * request must complete, and all resources have to be released. - */ - ucs_status_t status = ucp_request_check_status(sreq); - EXPECT_NE(UCS_INPROGRESS, status); + ucs_status_t status; + /* If rendezvous protocol is used, the m_err_count is increased + * on the receiver side, so the send request may not complete + * immediately */ + status = request_wait(sreq); if (request_must_fail) { EXPECT_TRUE((m_err_status == status) || (UCS_ERR_CANCELED == status)); } else { EXPECT_TRUE((m_err_status == status) || (UCS_OK == status)); } - ucp_request_release(sreq); } /* Additional sends must fail */