Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpi pp: fix messages larger than INT_MAX #6580

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ if(HPX_WITH_NETWORKING)
ADVANCED
)
hpx_option(
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.7"
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.8"
CATEGORY "Build Targets"
ADVANCED
)
Expand Down
5 changes: 5 additions & 0 deletions libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2015 Thomas Heller
// Copyright (c) 2024 Jiakun Yan
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -42,6 +43,10 @@ namespace hpx::util {

static std::string get_processor_name();

static MPI_Datatype type_contiguous(size_t nbytes);
static MPI_Request isend(void* address, size_t size, int rank, int tag);
static MPI_Request irecv(void* address, size_t size, int rank, int tag);

struct HPX_CORE_EXPORT scoped_lock
{
scoped_lock();
Expand Down
92 changes: 92 additions & 0 deletions libs/core/mpi_base/src/mpi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2020 Google
// Copyright (c) 2022 Patrick Diehl
// Copyright (c) 2023 Hartmut Kaiser
// Copyright (c) 2024 Jiakun Yan
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -467,6 +468,97 @@ namespace hpx::util {

report_error(sl, error_code);
}

// Acknowledgement: code adapted from github.com/jeffhammond/BigMPI
MPI_Datatype mpi_environment::type_contiguous(size_t nbytes)
{
size_t int_max = (std::numeric_limits<int>::max)();

size_t c = nbytes / int_max;
size_t r = nbytes % int_max;

HPX_ASSERT(c < int_max);
HPX_ASSERT(r < int_max);

MPI_Datatype chunks;
MPI_Type_vector(c, int_max, int_max, MPI_BYTE, &chunks);

MPI_Datatype remainder;
MPI_Type_contiguous(r, MPI_BYTE, &remainder);

MPI_Aint remdisp = (MPI_Aint) c * int_max;
int blocklengths[2] = {1, 1};
MPI_Aint displacements[2] = {0, remdisp};
MPI_Datatype types[2] = {chunks, remainder};
MPI_Datatype newtype;
MPI_Type_create_struct(2, blocklengths, displacements, types, &newtype);

MPI_Type_free(&chunks);
MPI_Type_free(&remainder);

return newtype;
}

MPI_Request mpi_environment::isend(
void* address, size_t size, int rank, int tag)
{
MPI_Request request;
MPI_Datatype datatype;
int length;
if (size > static_cast<size_t>((std::numeric_limits<int>::max)()))
{
datatype = type_contiguous(size);
MPI_Type_commit(&datatype);
length = 1;
}
else
{
datatype = MPI_BYTE;
length = static_cast<int>(size);
}

{
scoped_lock l;
int const ret = MPI_Isend(
address, length, datatype, rank, tag, communicator(), &request);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);
}

if (datatype != MPI_BYTE)
MPI_Type_free(&datatype);
return request;
}

MPI_Request mpi_environment::irecv(
void* address, size_t size, int rank, int tag)
{
MPI_Request request;
MPI_Datatype datatype;
int length;
if (size > static_cast<size_t>((std::numeric_limits<int>::max)()))
{
datatype = type_contiguous(size);
MPI_Type_commit(&datatype);
length = 1;
}
else
{
datatype = MPI_BYTE;
length = static_cast<int>(size);
}

{
scoped_lock l;
int const ret = MPI_Irecv(
address, length, datatype, rank, tag, communicator(), &request);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);
}

if (datatype != MPI_BYTE)
MPI_Type_free(&datatype);

return request;
}
} // namespace hpx::util

#endif
18 changes: 8 additions & 10 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
if (tchunk_size <= max_header_size - current_header_size)
{
current_header_size += tchunk_size;
}
Expand Down Expand Up @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
set<pos_numbytes_tchunk>(static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <= max_header_size - current_header_size)
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand All @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_.resize(num_zero_copy_chunks);
for (int j = 0; j < num_zero_copy_chunks; ++j)
{
std::size_t chunk_size =
buffer.transmission_chunks_[j].second;
size_t chunk_size = buffer.transmission_chunks_[j].second;
HPX_ASSERT(iovec.lbuffers[i].length == chunk_size);
buffer.chunks_[j] = serialization::create_pointer_chunk(
iovec.lbuffers[i].address, chunk_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
rcvd_chunks,
locked
};
LCI_comp_t unified_recv(void* address, int length);
LCI_comp_t unified_recv(void* address, size_t length);
return_t receive_transmission_chunks();
return_t receive_data();
return_t receive_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
locked,
};
return_t send_header();
return_t unified_followup_send(void* address, int length);
return_t unified_followup_send(void* address, size_t length);
return_t send_transmission_chunks();
return_t send_data();
return_t send_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci {
std::vector<
typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
iovec.lbuffers[i].address = tchunks.data();
iovec.lbuffers[i].length = tchunks_length;
if (config_t::reg_mem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand Down Expand Up @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci {
}

LCI_comp_t receiver_connection_sendrecv::unified_recv(
void* address, int length)
void* address, size_t length)
{
LCI_comp_t completion =
device_p->completion_manager_p->recv_followup->alloc_completion();
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t mbuffer;
mbuffer.address = address;
Expand Down Expand Up @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_tchunks)
{
auto& tchunks = buffer.transmission_chunks_;
int tchunk_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunk_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length);
state.store(next_state, std::memory_order_release);
Expand All @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_data)
{
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(
buffer.data_.data(), static_cast<int>(buffer.data_.size()));
LCI_comp_t completion =
unified_recv(buffer.data_.data(), buffer.data_.size());
state.store(next_state, std::memory_order_release);
return {false, completion};
}
Expand Down Expand Up @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(chunk_size);

state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand All @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_[idx] =
serialization::create_pointer_chunk(chunk.data(), chunk.size());
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci {
}

sender_connection_sendrecv::return_t
sender_connection_sendrecv::unified_followup_send(void* address, int length)
sender_connection_sendrecv::unified_followup_send(
void* address, size_t length)
{
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t buffer;
buffer.address = address;
Expand Down Expand Up @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci {

std::vector<typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_size = (int) tchunks.size() *
size_t tchunks_size = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
auto ret = unified_followup_send(tchunks.data(), tchunks_size);
Expand Down Expand Up @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci {
{
state.store(
connection_state::locked, std::memory_order_relaxed);
auto ret =
unified_followup_send(const_cast<void*>(chunk.data_.cpos_),
static_cast<int>(chunk.size_));
auto ret = unified_followup_send(
const_cast<void*>(chunk.data_.cpos_), chunk.size_);
if (ret.status == return_status_t::done)
{
++send_chunks_idx;
Expand Down
Loading
Loading