From bafa39e68b1d636a576ee903f83ef9e4dff1dcc9 Mon Sep 17 00:00:00 2001 From: Felipe Magno de Almeida Date: Sat, 4 Nov 2023 20:35:27 -0700 Subject: [PATCH] Homa: Add async_receive_request_from --- include/boost/asio/basic_homa_socket.hpp | 105 +++++----- include/boost/asio/detail/impl/homa_ops.ipp | 41 +++- .../reactive_socket_recv_request_from_op.hpp | 193 ++++++++++++++++++ .../asio/detail/reactive_socket_service.hpp | 75 +++---- test/ip/homa.cpp | 21 +- 5 files changed, 336 insertions(+), 99 deletions(-) create mode 100644 include/boost/asio/detail/reactive_socket_recv_request_from_op.hpp diff --git a/include/boost/asio/basic_homa_socket.hpp b/include/boost/asio/basic_homa_socket.hpp index 96bf82c752..86db2b30c6 100644 --- a/include/boost/asio/basic_homa_socket.hpp +++ b/include/boost/asio/basic_homa_socket.hpp @@ -78,7 +78,7 @@ class basic_homa_socket //class initiate_async_send; class initiate_async_send_request_to; // class initiate_async_receive; - // class initiate_async_receive_from; + class initiate_async_receive_request_from; public: /// The type of the executor associated with the object. @@ -1260,23 +1260,23 @@ class basic_homa_socket * * @li @c cancellation_type::total */ - // template > - // auto async_receive_from(const MutableBufferSequence& buffers, - // endpoint_type& sender_endpoint, - // ReadToken&& token = default_completion_token_t()) - // -> decltype( - // async_initiate( - // declval(), token, buffers, - // &sender_endpoint, socket_base::message_flags(0))) - // { - // return async_initiate( - // initiate_async_receive_from(this), token, buffers, - // &sender_endpoint, socket_base::message_flags(0)); - // } + template < + BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code, + std::size_t, homa_pages, std::uint64_t)) ReadToken = default_completion_token_t> + auto async_receive_request_from( + endpoint_type& sender_endpoint, + ReadToken&& token = default_completion_token_t()) + -> decltype( + async_initiate( + declval(), token, + &sender_endpoint, socket_base::message_flags(0))) + { + return async_initiate( + initiate_async_receive_request_from(this), token, + &sender_endpoint, socket_base::message_flags(0)); + } /// Start an asynchronous receive. /** @@ -1416,46 +1416,45 @@ class basic_homa_socket basic_homa_socket* self_; }; - class initiate_async_receive - { - public: - typedef Executor executor_type; + // class initiate_async_receive + // { + // public: + // typedef Executor executor_type; - explicit initiate_async_receive(basic_homa_socket* self) - : self_(self) - { - } + // explicit initiate_async_receive(basic_homa_socket* self) + // : self_(self) + // { + // } - const executor_type& get_executor() const noexcept - { - return self_->get_executor(); - } + // const executor_type& get_executor() const noexcept + // { + // return self_->get_executor(); + // } - template - void operator()(ReadHandler&& handler, - const MutableBufferSequence& buffers, - socket_base::message_flags flags) const - { - // If you get an error on the following line it means that your handler - // does not meet the documented type requirements for a ReadHandler. - BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check; + // template + // void operator()(ReadHandler&& handler, + // socket_base::message_flags flags) const + // { + // // If you get an error on the following line it means that your handler + // // does not meet the documented type requirements for a ReadHandler. + // //BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check; - detail::non_const_lvalue handler2(handler); - self_->impl_.get_service().async_receive( - self_->impl_.get_implementation(), buffers, flags, - handler2.value, self_->impl_.get_executor()); - } + // detail::non_const_lvalue handler2(handler); + // self_->impl_.get_service().async_receive( + // self_->impl_.get_implementation(), flags, + // handler2.value, self_->impl_.get_executor()); + // } - private: - basic_homa_socket* self_; - }; + // private: + // basic_homa_socket* self_; + // }; - class initiate_async_receive_from + class initiate_async_receive_request_from { public: typedef Executor executor_type; - explicit initiate_async_receive_from(basic_homa_socket* self) + explicit initiate_async_receive_request_from(basic_homa_socket* self) : self_(self) { } @@ -1465,18 +1464,18 @@ class basic_homa_socket return self_->get_executor(); } - template + template void operator()(ReadHandler&& handler, - const MutableBufferSequence& buffers, endpoint_type* sender_endpoint, + endpoint_type* sender_endpoint, socket_base::message_flags flags) const { // If you get an error on the following line it means that your handler // does not meet the documented type requirements for a ReadHandler. - BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check; + //BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check; detail::non_const_lvalue handler2(handler); - self_->impl_.get_service().async_receive_from( - self_->impl_.get_implementation(), buffers, *sender_endpoint, + self_->impl_.get_service().async_receive_request_from( + self_->impl_.get_implementation(), *sender_endpoint, flags, handler2.value, self_->impl_.get_executor()); } diff --git a/include/boost/asio/detail/impl/homa_ops.ipp b/include/boost/asio/detail/impl/homa_ops.ipp index 3470b62eb7..b59e7524ec 100644 --- a/include/boost/asio/detail/impl/homa_ops.ipp +++ b/include/boost/asio/detail/impl/homa_ops.ipp @@ -121,7 +121,10 @@ signed_size_type recvfrom(socket_type s, homa_pages& pages, msg.msg_namelen = static_cast(*addrlen); msg.msg_control = &args; msg.msg_controllen = sizeof(args); + fprintf(stderr, "waiting to read %d %d\n", (int)args.num_bpages, args.bpage_offsets[0]); signed_size_type result = ::recvmsg(s, &msg, flags); + fprintf(stderr, "result of recvmsg %d\n", (int)result); + fprintf(stderr, "finished reading first offset at %d\n", (int)args.bpage_offsets[0]); pages.copy_from(args.bpage_offsets, args.num_bpages); id = args.id; completion_cookie = args.completion_cookie; @@ -187,6 +190,7 @@ signed_size_type sendto(socket_type s, const socket_ops::buf* bufs, flags |= MSG_NOSIGNAL; #endif // defined(BOOST_ASIO_HAS_MSG_NOSIGNAL) signed_size_type result = ::sendmsg(s, &msg, flags); + fprintf(stderr, "sendmsg return %d\n", (int)result); id = args.id; socket_ops::get_last_error(ec, result < 0); return result; @@ -216,8 +220,10 @@ size_t sync_sendto(socket_type s, socket_ops::state_type state, // Operation failed. if ((state & socket_ops::user_set_non_blocking) || (ec != boost::asio::error::would_block - && ec != boost::asio::error::try_again)) + && ec != boost::asio::error::try_again)) { + fprintf(stderr, "would block or try again\n"); return 0; + } // Wait for socket to become ready. if (socket_ops::poll_write(s, 0, -1, ec) < 0) @@ -259,6 +265,39 @@ bool non_blocking_send_request_to(socket_type s, } } +bool non_blocking_recvfrom(socket_type s, homa_pages& pages, + int flags, void* addr, std::size_t* addrlen, + boost::system::error_code& ec, size_t& bytes_transferred, std::uint64_t& id, + int homa_flags) +{ + for (;;) + { + // Read some data. + std::uint64_t completion_cookie = 0; + signed_size_type bytes = homa_ops::recvfrom(s, pages, flags, addr, addrlen, id, completion_cookie, homa_flags, ec); + + // Check if operation succeeded. + if (bytes >= 0) + { + bytes_transferred = bytes; + return true; + } + + // Retry operation if interrupted by signal. + if (ec == boost::asio::error::interrupted) + continue; + + // Check if we need to run the operation again. + if (ec == boost::asio::error::would_block + || ec == boost::asio::error::try_again) + return false; + + // Operation failed. + bytes_transferred = 0; + return true; + } +} + } // namespace homa_ops } // namespace detail } // namespace asio diff --git a/include/boost/asio/detail/reactive_socket_recv_request_from_op.hpp b/include/boost/asio/detail/reactive_socket_recv_request_from_op.hpp new file mode 100644 index 0000000000..fdd31cf33a --- /dev/null +++ b/include/boost/asio/detail/reactive_socket_recv_request_from_op.hpp @@ -0,0 +1,193 @@ +// +// detail/reactive_socket_recv_request_from_op.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_ASIO_DETAIL_REACTIVE_SOCKET_RECV_REQUEST_FROM_OP_HPP +#define BOOST_ASIO_DETAIL_REACTIVE_SOCKET_RECV_REQUEST_FROM_OP_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +# pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace boost { +namespace asio { +namespace detail { + +template +class reactive_socket_recv_request_from_op_base : public reactor_op +{ +public: + reactive_socket_recv_request_from_op_base(const boost::system::error_code& success_ec, + socket_type socket, int protocol_type, Endpoint& endpoint, + socket_base::message_flags flags, func_type complete_func) + : reactor_op(success_ec, + &reactive_socket_recv_request_from_op_base::do_perform, complete_func), + socket_(socket), + protocol_type_(protocol_type), + pages_(), + id_(0), + sender_endpoint_(endpoint), + flags_(flags) + { + } + + static status do_perform(reactor_op* base) + { + BOOST_ASIO_ASSUME(base != 0); + reactive_socket_recv_request_from_op_base* o( + static_cast(base)); + + std::size_t addr_len = o->sender_endpoint_.capacity(); + status result; + { + result = homa_ops::non_blocking_recvfrom(o->socket_, o->pages_, + o->flags_, + o->sender_endpoint_.data(), &addr_len, + o->ec_, o->bytes_transferred_, o->id_, asio::detail::homa_ops::homa_recvmsg_request) ? done : not_done; + } + + if (result && !o->ec_) + o->sender_endpoint_.resize(addr_len); + + BOOST_ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recv_request_from", + o->ec_, o->bytes_transferred_, pages, id)); + + return result; + } + + homa_pages pages_; + std::uint64_t id_; +private: + socket_type socket_; + int protocol_type_; + Endpoint& sender_endpoint_; + socket_base::message_flags flags_; +}; + +template +class reactive_socket_recv_request_from_op : + public reactive_socket_recv_request_from_op_base +{ +public: + typedef Handler handler_type; + typedef IoExecutor io_executor_type; + + BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_recv_request_from_op); + + reactive_socket_recv_request_from_op(const boost::system::error_code& success_ec, + socket_type socket, int protocol_type, + Endpoint& endpoint, + socket_base::message_flags flags, Handler& handler, + const IoExecutor& io_ex) + : reactive_socket_recv_request_from_op_base( + success_ec, socket, protocol_type, endpoint, flags, + &reactive_socket_recv_request_from_op::do_complete), + handler_(static_cast(handler)), + work_(handler_, io_ex) + { + } + + static void do_complete(void* owner, operation* base, + const boost::system::error_code& /*ec*/, + std::size_t /*bytes_transferred*/) + { + // Take ownership of the handler object. + BOOST_ASIO_ASSUME(base != 0); + reactive_socket_recv_request_from_op* o( + static_cast(base)); + ptr p = { boost::asio::detail::addressof(o->handler_), o, o }; + + BOOST_ASIO_HANDLER_COMPLETION((*o)); + + // Take ownership of the operation's outstanding work. + handler_work w( + static_cast&&>( + o->work_)); + + BOOST_ASIO_ERROR_LOCATION(o->ec_); + + // Make a copy of the handler so that the memory can be deallocated before + // the upcall is made. Even if we're not about to make an upcall, a + // sub-object of the handler may be the true owner of the memory associated + // with the handler. Consequently, a local copy of the handler is required + // to ensure that any owning sub-object remains valid until after we have + // deallocated the memory here. + detail::binder4 + handler(o->handler_, o->ec_, o->bytes_transferred_, o->pages_, o->id_); + p.h = boost::asio::detail::addressof(handler.handler_); + p.reset(); + + // Make the upcall if required. + if (owner) + { + fenced_block b(fenced_block::half); + BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_)); + w.complete(handler, handler.handler_); + BOOST_ASIO_HANDLER_INVOCATION_END; + } + } + + static void do_immediate(operation* base, bool, const void* io_ex) + { + // Take ownership of the handler object. + BOOST_ASIO_ASSUME(base != 0); + reactive_socket_recv_request_from_op* o( + static_cast(base)); + ptr p = { boost::asio::detail::addressof(o->handler_), o, o }; + + BOOST_ASIO_HANDLER_COMPLETION((*o)); + + // Take ownership of the operation's outstanding work. + immediate_handler_work w( + static_cast&&>( + o->work_)); + + BOOST_ASIO_ERROR_LOCATION(o->ec_); + + // Make a copy of the handler so that the memory can be deallocated before + // the upcall is made. Even if we're not about to make an upcall, a + // sub-object of the handler may be the true owner of the memory associated + // with the handler. Consequently, a local copy of the handler is required + // to ensure that any owning sub-object remains valid until after we have + // deallocated the memory here. + detail::binder2 + handler(o->handler_, o->ec_, o->bytes_transferred_); + p.h = boost::asio::detail::addressof(handler.handler_); + p.reset(); + + BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_)); + w.complete(handler, handler.handler_, io_ex); + BOOST_ASIO_HANDLER_INVOCATION_END; + } + +private: + Handler handler_; + handler_work work_; +}; + +} // namespace detail +} // namespace asio +} // namespace boost + +#include + +#endif // BOOST_ASIO_DETAIL_REACTIVE_SOCKET_RECV_REQUEST_FROM_OP_HPP diff --git a/include/boost/asio/detail/reactive_socket_service.hpp b/include/boost/asio/detail/reactive_socket_service.hpp index 7da678d211..82e1f36878 100644 --- a/include/boost/asio/detail/reactive_socket_service.hpp +++ b/include/boost/asio/detail/reactive_socket_service.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -420,10 +421,10 @@ class reactive_socket_service : // Start an asynchronous receive. The buffer for the data being received and // the sender_endpoint object must both be valid for the lifetime of the // asynchronous operation. - template - void async_receive_from(implementation_type& impl, - const MutableBufferSequence& buffers, endpoint_type& sender_endpoint, + void async_receive_request_from(implementation_type& impl, + endpoint_type& sender_endpoint, socket_base::message_flags flags, Handler& handler, const IoExecutor& io_ex) { @@ -434,13 +435,13 @@ class reactive_socket_service : = boost::asio::get_associated_cancellation_slot(handler); // Allocate and construct an operation to wrap the handler. - typedef reactive_socket_recvfrom_op op; typename op::ptr p = { boost::asio::detail::addressof(handler), op::ptr::allocate(handler), 0 }; int protocol = impl.protocol_.type(); p.p = new (p.v) op(success_ec_, impl.socket_, protocol, - buffers, sender_endpoint, flags, handler, io_ex); + sender_endpoint, flags, handler, io_ex); // Optionally register for per-operation cancellation. if (slot.is_connected()) @@ -460,44 +461,44 @@ class reactive_socket_service : p.v = p.p = 0; } - // Wait until data can be received without blocking. - template - void async_receive_from(implementation_type& impl, const null_buffers&, - endpoint_type& sender_endpoint, socket_base::message_flags flags, - Handler& handler, const IoExecutor& io_ex) - { - bool is_continuation = - boost_asio_handler_cont_helpers::is_continuation(handler); + // // Wait until data can be received without blocking. + // template + // void async_receive_from(implementation_type& impl, const null_buffers&, + // endpoint_type& sender_endpoint, socket_base::message_flags flags, + // Handler& handler, const IoExecutor& io_ex) + // { + // bool is_continuation = + // boost_asio_handler_cont_helpers::is_continuation(handler); - associated_cancellation_slot_t slot - = boost::asio::get_associated_cancellation_slot(handler); + // associated_cancellation_slot_t slot + // = boost::asio::get_associated_cancellation_slot(handler); - // Allocate and construct an operation to wrap the handler. - typedef reactive_null_buffers_op op; - typename op::ptr p = { boost::asio::detail::addressof(handler), - op::ptr::allocate(handler), 0 }; - p.p = new (p.v) op(success_ec_, handler, io_ex); + // // Allocate and construct an operation to wrap the handler. + // typedef reactive_null_buffers_op op; + // typename op::ptr p = { boost::asio::detail::addressof(handler), + // op::ptr::allocate(handler), 0 }; + // p.p = new (p.v) op(success_ec_, handler, io_ex); - // Optionally register for per-operation cancellation. - if (slot.is_connected()) - { - p.p->cancellation_key_ = - &slot.template emplace( - &reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op); - } + // // Optionally register for per-operation cancellation. + // if (slot.is_connected()) + // { + // p.p->cancellation_key_ = + // &slot.template emplace( + // &reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op); + // } - BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket", - &impl, impl.socket_, "async_receive_from(null_buffers)")); + // BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket", + // &impl, impl.socket_, "async_receive_from(null_buffers)")); - // Reset endpoint since it can be given no sensible value at this time. - sender_endpoint = endpoint_type(); + // // Reset endpoint since it can be given no sensible value at this time. + // sender_endpoint = endpoint_type(); - start_op(impl, - (flags & socket_base::message_out_of_band) - ? reactor::except_op : reactor::read_op, - p.p, is_continuation, false, false, &io_ex, 0); - p.v = p.p = 0; - } + // start_op(impl, + // (flags & socket_base::message_out_of_band) + // ? reactor::except_op : reactor::read_op, + // p.p, is_continuation, false, false, &io_ex, 0); + // p.v = p.p = 0; + // } // Accept a new connection. template diff --git a/test/ip/homa.cpp b/test/ip/homa.cpp index 4e35cb3cca..f9434c4939 100644 --- a/test/ip/homa.cpp +++ b/test/ip/homa.cpp @@ -494,15 +494,19 @@ void handle_send_request(size_t expected_bytes_sent, { std::cout << "err " << err << " " << err.message() << std::endl; std::cout << "id of request " << id << std::endl; + std::cout << "bytes sent " << bytes_sent << " expected " << expected_bytes_sent << std::endl; BOOST_ASIO_CHECK(!err); BOOST_ASIO_CHECK(expected_bytes_sent == bytes_sent); } -void handle_recv(size_t expected_bytes_recvd, - const boost::system::error_code& err, size_t bytes_recvd) +void handle_recv_request(char* send_msg, std::size_t send_msg_size, uint8_t* buffer, + size_t expected_bytes_recvd, + const boost::system::error_code& err, size_t bytes_recvd, + const boost::asio::homa_pages& pages, std::uint64_t id) { BOOST_ASIO_CHECK(!err); BOOST_ASIO_CHECK(expected_bytes_recvd == bytes_recvd); + BOOST_ASIO_CHECK(memcmp(send_msg, (buffer + pages.offsets()[0]), send_msg_size) == 0); } void test() @@ -515,6 +519,7 @@ void test() using bindns::placeholders::_1; using bindns::placeholders::_2; using bindns::placeholders::_3; + using bindns::placeholders::_4; io_context ioc; @@ -592,16 +597,16 @@ void test() // memset(recv_msg, 0, sizeof(recv_msg)); fprintf(stderr, "async send\n"); - char recv_msg[sizeof(send_msg)]; // target_endpoint = sender_endpoint; - s1.async_send_request_to(buffer(send_msg, (sizeof(send_msg)-1)), target_endpoint, 0, + s1.async_send_request_to(buffer(send_msg, (sizeof(send_msg)-1)), sender_endpoint, 0, bindns::bind(handle_send_request, (sizeof(send_msg)-1), _1, _2, _3)); - // s2.async_receive_from(buffer(recv_msg, sizeof(recv_msg)), sender_endpoint, - // bindns::bind(handle_recv, sizeof(recv_msg), _1, _2)); + fprintf(stderr, "async receive\n"); + s2.async_receive_request_from(target_endpoint, + bindns::bind(handle_recv_request, send_msg, sizeof(send_msg)-1, + (std::uint8_t*)buffer2.first.data(), + sizeof(send_msg)-1, _1, _2, _3, _4)); ioc.run(); - - BOOST_ASIO_CHECK(memcmp(send_msg, recv_msg, (sizeof(send_msg)-1)) == 0); } } // namespace ip_homa_socket_runtime