Skip to content

Commit

Permalink
Homa: Add async_receive_request_from
Browse files Browse the repository at this point in the history
  • Loading branch information
felipealmeida committed Nov 5, 2023
1 parent 221ae00 commit bafa39e
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 99 deletions.
105 changes: 52 additions & 53 deletions include/boost/asio/basic_homa_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class basic_homa_socket
//class initiate_async_send;
class initiate_async_send_request_to;
// class initiate_async_receive;
// class initiate_async_receive_from;
class initiate_async_receive_request_from;

public:
/// The type of the executor associated with the object.
Expand Down Expand Up @@ -1260,23 +1260,23 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
// template <typename MutableBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) ReadToken = default_completion_token_t<executor_type>>
// auto async_receive_from(const MutableBufferSequence& buffers,
// endpoint_type& sender_endpoint,
// ReadToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_receive_from>(), token, buffers,
// &sender_endpoint, socket_base::message_flags(0)))
// {
// return async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_receive_from(this), token, buffers,
// &sender_endpoint, socket_base::message_flags(0));
// }
template <
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t, homa_pages, std::uint64_t)) ReadToken = default_completion_token_t<executor_type>>
auto async_receive_request_from(
endpoint_type& sender_endpoint,
ReadToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_receive_request_from>(), token,
&sender_endpoint, socket_base::message_flags(0)))
{
return async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_receive_request_from(this), token,
&sender_endpoint, socket_base::message_flags(0));
}

/// Start an asynchronous receive.
/**
Expand Down Expand Up @@ -1416,46 +1416,45 @@ class basic_homa_socket
basic_homa_socket* self_;
};

class initiate_async_receive
{
public:
typedef Executor executor_type;
// class initiate_async_receive
// {
// public:
// typedef Executor executor_type;

explicit initiate_async_receive(basic_homa_socket* self)
: self_(self)
{
}
// explicit initiate_async_receive(basic_homa_socket* self)
// : self_(self)
// {
// }

const executor_type& get_executor() const noexcept
{
return self_->get_executor();
}
// const executor_type& get_executor() const noexcept
// {
// return self_->get_executor();
// }

template <typename ReadHandler, typename MutableBufferSequence>
void operator()(ReadHandler&& handler,
const MutableBufferSequence& buffers,
socket_base::message_flags flags) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a ReadHandler.
BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;
// template <typename ReadHandler, typename MutableBufferSequence>
// void operator()(ReadHandler&& handler,
// socket_base::message_flags flags) const
// {
// // If you get an error on the following line it means that your handler
// // does not meet the documented type requirements for a ReadHandler.
// //BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

detail::non_const_lvalue<ReadHandler> handler2(handler);
self_->impl_.get_service().async_receive(
self_->impl_.get_implementation(), buffers, flags,
handler2.value, self_->impl_.get_executor());
}
// detail::non_const_lvalue<ReadHandler> handler2(handler);
// self_->impl_.get_service().async_receive(
// self_->impl_.get_implementation(), flags,
// handler2.value, self_->impl_.get_executor());
// }

private:
basic_homa_socket* self_;
};
// private:
// basic_homa_socket* self_;
// };

class initiate_async_receive_from
class initiate_async_receive_request_from
{
public:
typedef Executor executor_type;

explicit initiate_async_receive_from(basic_homa_socket* self)
explicit initiate_async_receive_request_from(basic_homa_socket* self)
: self_(self)
{
}
Expand All @@ -1465,18 +1464,18 @@ class basic_homa_socket
return self_->get_executor();
}

template <typename ReadHandler, typename MutableBufferSequence>
template <typename ReadHandler>
void operator()(ReadHandler&& handler,
const MutableBufferSequence& buffers, endpoint_type* sender_endpoint,
endpoint_type* sender_endpoint,
socket_base::message_flags flags) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a ReadHandler.
BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;
//BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;

detail::non_const_lvalue<ReadHandler> handler2(handler);
self_->impl_.get_service().async_receive_from(
self_->impl_.get_implementation(), buffers, *sender_endpoint,
self_->impl_.get_service().async_receive_request_from(
self_->impl_.get_implementation(), *sender_endpoint,
flags, handler2.value, self_->impl_.get_executor());
}

Expand Down
41 changes: 40 additions & 1 deletion include/boost/asio/detail/impl/homa_ops.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ signed_size_type recvfrom(socket_type s, homa_pages& pages,
msg.msg_namelen = static_cast<int>(*addrlen);
msg.msg_control = &args;
msg.msg_controllen = sizeof(args);
fprintf(stderr, "waiting to read %d %d\n", (int)args.num_bpages, args.bpage_offsets[0]);
signed_size_type result = ::recvmsg(s, &msg, flags);
fprintf(stderr, "result of recvmsg %d\n", (int)result);
fprintf(stderr, "finished reading first offset at %d\n", (int)args.bpage_offsets[0]);
pages.copy_from(args.bpage_offsets, args.num_bpages);
id = args.id;
completion_cookie = args.completion_cookie;
Expand Down Expand Up @@ -187,6 +190,7 @@ signed_size_type sendto(socket_type s, const socket_ops::buf* bufs,
flags |= MSG_NOSIGNAL;
#endif // defined(BOOST_ASIO_HAS_MSG_NOSIGNAL)
signed_size_type result = ::sendmsg(s, &msg, flags);
fprintf(stderr, "sendmsg return %d\n", (int)result);
id = args.id;
socket_ops::get_last_error(ec, result < 0);
return result;
Expand Down Expand Up @@ -216,8 +220,10 @@ size_t sync_sendto(socket_type s, socket_ops::state_type state,
// Operation failed.
if ((state & socket_ops::user_set_non_blocking)
|| (ec != boost::asio::error::would_block
&& ec != boost::asio::error::try_again))
&& ec != boost::asio::error::try_again)) {
fprintf(stderr, "would block or try again\n");
return 0;
}

// Wait for socket to become ready.
if (socket_ops::poll_write(s, 0, -1, ec) < 0)
Expand Down Expand Up @@ -259,6 +265,39 @@ bool non_blocking_send_request_to(socket_type s,
}
}

bool non_blocking_recvfrom(socket_type s, homa_pages& pages,
int flags, void* addr, std::size_t* addrlen,
boost::system::error_code& ec, size_t& bytes_transferred, std::uint64_t& id,
int homa_flags)
{
for (;;)
{
// Read some data.
std::uint64_t completion_cookie = 0;
signed_size_type bytes = homa_ops::recvfrom(s, pages, flags, addr, addrlen, id, completion_cookie, homa_flags, ec);

// Check if operation succeeded.
if (bytes >= 0)
{
bytes_transferred = bytes;
return true;
}

// Retry operation if interrupted by signal.
if (ec == boost::asio::error::interrupted)
continue;

// Check if we need to run the operation again.
if (ec == boost::asio::error::would_block
|| ec == boost::asio::error::try_again)
return false;

// Operation failed.
bytes_transferred = 0;
return true;
}
}

} // namespace homa_ops
} // namespace detail
} // namespace asio
Expand Down
Loading

0 comments on commit bafa39e

Please sign in to comment.