Skip to content

Commit

Permalink
Homa: Add asynchronous send request
Browse files Browse the repository at this point in the history
  • Loading branch information
felipealmeida committed Nov 4, 2023
1 parent 47b453a commit 221ae00
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 239 deletions.
270 changes: 142 additions & 128 deletions include/boost/asio/basic_homa_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ class basic_homa_socket
: public basic_socket<Protocol, Executor>
{
private:
class initiate_async_send;
class initiate_async_send_to;
class initiate_async_receive;
class initiate_async_receive_from;
//class initiate_async_send;
class initiate_async_send_request_to;
// class initiate_async_receive;
// class initiate_async_receive_from;

public:
/// The type of the executor associated with the object.
Expand Down Expand Up @@ -368,6 +368,19 @@ class basic_homa_socket
boost::asio::detail::throw_error(ec, "receive");
}

/**
*
*
*/
void release_pages(homa_pages const& pages,
const endpoint_type& endpoint)
{
boost::system::error_code ec;
this->impl_.get_service().release_pages(this->impl_.get_implementation(), pages, 0,
detail::homa_ops::homa_recvmsg_nonblocking, ec,
endpoint);
boost::asio::detail::throw_error(ec, "release_pages");
}

/// Send some data on a connected socket.
/**
Expand Down Expand Up @@ -506,22 +519,22 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
template <typename ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken = default_completion_token_t<executor_type>>
auto async_send(const ConstBufferSequence& buffers,
WriteToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_send>(), token,
buffers, socket_base::message_flags(0)))
{
return async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_send(this), token,
buffers, socket_base::message_flags(0));
}
// template <typename ConstBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) WriteToken = default_completion_token_t<executor_type>>
// auto async_send(const ConstBufferSequence& buffers,
// WriteToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_send>(), token,
// buffers, socket_base::message_flags(0)))
// {
// return async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_send(this), token,
// buffers, socket_base::message_flags(0));
// }

/// Start an asynchronous send on a connected socket.
/**
Expand Down Expand Up @@ -567,21 +580,21 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
template <typename ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken = default_completion_token_t<executor_type>>
auto async_send(const ConstBufferSequence& buffers,
socket_base::message_flags flags,
WriteToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_send>(), token, buffers, flags))
{
return async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_send(this), token, buffers, flags);
}
// template <typename ConstBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) WriteToken = default_completion_token_t<executor_type>>
// auto async_send(const ConstBufferSequence& buffers,
// socket_base::message_flags flags,
// WriteToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_send>(), token, buffers, flags))
// {
// return async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_send(this), token, buffers, flags);
// }

/// Send a homa to the specified endpoint.
/**
Expand Down Expand Up @@ -769,23 +782,23 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
template <typename ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken = default_completion_token_t<executor_type>>
auto async_send_to(const ConstBufferSequence& buffers,
const endpoint_type& destination,
WriteToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_send_to>(), token, buffers,
destination, socket_base::message_flags(0)))
{
return async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_send_to(this), token, buffers,
destination, socket_base::message_flags(0));
}
// template <typename ConstBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) WriteToken = default_completion_token_t<executor_type>>
// auto async_send_to(const ConstBufferSequence& buffers,
// const endpoint_type& destination,
// WriteToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_send_to>(), token, buffers,
// destination, socket_base::message_flags(0)))
// {
// return async_initiate<WriteToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_send_to(this), token, buffers,
// destination, socket_base::message_flags(0));
// }

/// Start an asynchronous send.
/**
Expand Down Expand Up @@ -832,19 +845,19 @@ class basic_homa_socket
*/
template <typename ConstBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken = default_completion_token_t<executor_type>>
auto async_send_to(const ConstBufferSequence& buffers,
std::size_t, int id)) WriteToken = default_completion_token_t<executor_type>>
auto async_send_request_to(const ConstBufferSequence& buffers,
const endpoint_type& destination, socket_base::message_flags flags,
WriteToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_send_to>(), token,
declval<initiate_async_send_request_to>(), token,
buffers, destination, flags))
{
return async_initiate<WriteToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_send_to(this), token,
initiate_async_send_request_to(this), token,
buffers, destination, flags);
}

Expand Down Expand Up @@ -1247,23 +1260,23 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
template <typename MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) ReadToken = default_completion_token_t<executor_type>>
auto async_receive_from(const MutableBufferSequence& buffers,
endpoint_type& sender_endpoint,
ReadToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_receive_from>(), token, buffers,
&sender_endpoint, socket_base::message_flags(0)))
{
return async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_receive_from(this), token, buffers,
&sender_endpoint, socket_base::message_flags(0));
}
// template <typename MutableBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) ReadToken = default_completion_token_t<executor_type>>
// auto async_receive_from(const MutableBufferSequence& buffers,
// endpoint_type& sender_endpoint,
// ReadToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_receive_from>(), token, buffers,
// &sender_endpoint, socket_base::message_flags(0)))
// {
// return async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_receive_from(this), token, buffers,
// &sender_endpoint, socket_base::message_flags(0));
// }

/// Start an asynchronous receive.
/**
Expand Down Expand Up @@ -1310,70 +1323,70 @@ class basic_homa_socket
*
* @li @c cancellation_type::total
*/
template <typename MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) ReadToken = default_completion_token_t<executor_type>>
auto async_receive_from(const MutableBufferSequence& buffers,
endpoint_type& sender_endpoint, socket_base::message_flags flags,
ReadToken&& token = default_completion_token_t<executor_type>())
-> decltype(
async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
declval<initiate_async_receive_from>(), token,
buffers, &sender_endpoint, flags))
{
return async_initiate<ReadToken,
void (boost::system::error_code, std::size_t)>(
initiate_async_receive_from(this), token,
buffers, &sender_endpoint, flags);
}
// template <typename MutableBufferSequence,
// BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
// std::size_t)) ReadToken = default_completion_token_t<executor_type>>
// auto async_receive_from(const MutableBufferSequence& buffers,
// endpoint_type& sender_endpoint, socket_base::message_flags flags,
// ReadToken&& token = default_completion_token_t<executor_type>())
// -> decltype(
// async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// declval<initiate_async_receive_from>(), token,
// buffers, &sender_endpoint, flags))
// {
// return async_initiate<ReadToken,
// void (boost::system::error_code, std::size_t)>(
// initiate_async_receive_from(this), token,
// buffers, &sender_endpoint, flags);
// }

private:
// Disallow copying and assignment.
basic_homa_socket(const basic_homa_socket&) = delete;
basic_homa_socket& operator=(
const basic_homa_socket&) = delete;

class initiate_async_send
{
public:
typedef Executor executor_type;

explicit initiate_async_send(basic_homa_socket* self)
: self_(self)
{
}

const executor_type& get_executor() const noexcept
{
return self_->get_executor();
}

template <typename WriteHandler, typename ConstBufferSequence>
void operator()(WriteHandler&& handler,
const ConstBufferSequence& buffers,
socket_base::message_flags flags) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a WriteHandler.
BOOST_ASIO_WRITE_HANDLER_CHECK(WriteHandler, handler) type_check;

detail::non_const_lvalue<WriteHandler> handler2(handler);
self_->impl_.get_service().async_send(
self_->impl_.get_implementation(), buffers, flags,
handler2.value, self_->impl_.get_executor());
}

private:
basic_homa_socket* self_;
};

class initiate_async_send_to
// class initiate_async_send
// {
// public:
// typedef Executor executor_type;

// explicit initiate_async_send(basic_homa_socket* self)
// : self_(self)
// {
// }

// const executor_type& get_executor() const noexcept
// {
// return self_->get_executor();
// }

// template <typename WriteHandler, typename ConstBufferSequence>
// void operator()(WriteHandler&& handler,
// const ConstBufferSequence& buffers,
// socket_base::message_flags flags) const
// {
// // If you get an error on the following line it means that your handler
// // does not meet the documented type requirements for a WriteHandler.
// BOOST_ASIO_WRITE_HANDLER_CHECK(WriteHandler, handler) type_check;

// detail::non_const_lvalue<WriteHandler> handler2(handler);
// self_->impl_.get_service().async_send(
// self_->impl_.get_implementation(), buffers, flags,
// handler2.value, self_->impl_.get_executor());
// }

// private:
// basic_homa_socket* self_;
// };

class initiate_async_send_request_to
{
public:
typedef Executor executor_type;

explicit initiate_async_send_to(basic_homa_socket* self)
explicit initiate_async_send_request_to(basic_homa_socket* self)
: self_(self)
{
}
Expand All @@ -1388,12 +1401,13 @@ class basic_homa_socket
const ConstBufferSequence& buffers, const endpoint_type& destination,
socket_base::message_flags flags) const
{
fprintf(stderr, "initiate_async_send_request_to\n");
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a WriteHandler.
BOOST_ASIO_WRITE_HANDLER_CHECK(WriteHandler, handler) type_check;
//BOOST_ASIO_WRITE_HANDLER_CHECK(WriteHandler, handler) type_check;

detail::non_const_lvalue<WriteHandler> handler2(handler);
self_->impl_.get_service().async_send_to(
self_->impl_.get_service().async_send_request_to(
self_->impl_.get_implementation(), buffers, destination,
flags, handler2.value, self_->impl_.get_executor());
}
Expand Down
Loading

0 comments on commit 221ae00

Please sign in to comment.