From 2ca27525baa235db4ea5e08a853b38a4ca0e5428 Mon Sep 17 00:00:00 2001 From: Christopher Kohlhoff Date: Tue, 1 Aug 2023 22:04:28 +1000 Subject: [PATCH] Add examples showing how to use channels for mutual exclusion. --- doc/examples.qbk | 2 + example/cpp20/channels/Jamfile.v2 | 34 ++++ example/cpp20/channels/mutual_exclusion_1.cpp | 182 +++++++++++++++++ example/cpp20/channels/mutual_exclusion_2.cpp | 192 ++++++++++++++++++ 4 files changed, 410 insertions(+) create mode 100644 example/cpp20/channels/Jamfile.v2 create mode 100644 example/cpp20/channels/mutual_exclusion_1.cpp create mode 100644 example/cpp20/channels/mutual_exclusion_2.cpp diff --git a/doc/examples.qbk b/doc/examples.qbk index 0197bd3f76..203f2f44b3 100644 --- a/doc/examples.qbk +++ b/doc/examples.qbk @@ -677,6 +677,8 @@ Coroutines TS. Example showing how to use a channel in conjunction with C++20 coroutines. +* [@boost_asio/example/cpp20/channels/mutual_exclusion_1.cpp] +* [@boost_asio/example/cpp20/channels/mutual_exclusion_2.cpp] * [@boost_asio/example/cpp20/channels/throttling_proxy.cpp] diff --git a/example/cpp20/channels/Jamfile.v2 b/example/cpp20/channels/Jamfile.v2 new file mode 100644 index 0000000000..210129f9fd --- /dev/null +++ b/example/cpp20/channels/Jamfile.v2 @@ -0,0 +1,34 @@ +# +# Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# + +lib socket ; # SOLARIS +lib nsl ; # SOLARIS +lib ws2_32 ; # NT +lib mswsock ; # NT +lib ipv6 ; # HPUX +lib network ; # HAIKU + +project + : requirements + /boost/system//boost_system + /boost/chrono//boost_chrono + BOOST_ALL_NO_LIB=1 + multi + solaris:socket + solaris:nsl + windows:_WIN32_WINNT=0x0501 + windows,gcc:ws2_32 + windows,gcc:mswsock + windows,gcc-cygwin:__USE_W32_SOCKETS + hpux,gcc:_XOPEN_SOURCE_EXTENDED + hpux:ipv6 + haiku:network + ; + +exe mutual_exclusion_1 : mutual_exclusion_1.cpp ; +exe mutual_exclusion_2 : mutual_exclusion_2.cpp ; +exe throttling_proxy : throttling_proxy.cpp ; diff --git a/example/cpp20/channels/mutual_exclusion_1.cpp b/example/cpp20/channels/mutual_exclusion_1.cpp new file mode 100644 index 0000000000..0118cc7176 --- /dev/null +++ b/example/cpp20/channels/mutual_exclusion_1.cpp @@ -0,0 +1,182 @@ +// +// mutual_exclusion_1.cpp +// ~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include + +using boost::asio::as_tuple; +using boost::asio::awaitable; +using boost::asio::dynamic_buffer; +using boost::asio::co_spawn; +using boost::asio::deferred; +using boost::asio::detached; +using boost::asio::experimental::channel; +using boost::asio::io_context; +using boost::asio::ip::tcp; +using boost::asio::steady_timer; +using namespace boost::asio::buffer_literals; +using namespace std::literals::chrono_literals; + +// This class implements a simple line-based protocol: +// +// * For event line that is received from the client, the session sends a +// message header followed by the content of the line as the message body. +// +// * The session generates heartbeat messages once a second. +// +// This protocol is implemented using two actors, handle_messages() and +// send_heartbeats(), each written as a coroutine. +class line_based_echo_session : + public std::enable_shared_from_this +{ + // The socket used to read from and write to the client. This socket is a + // data member as it is shared between the two actors. + tcp::socket socket_; + + // As both of the actors will write to the socket, we need a lock to prevent + // these writes from overlapping. To achieve this, we use a channel with a + // buffer size of one. The lock is claimed by sending a message to the + // channel, and then released by receiving this message back again. If the + // lock is not held then the channel's buffer is empty, and the send will + // complete without delay. Otherwise, if the lock is held by the other actor, + // then the send operation will not complete until the lock is released. + channel write_lock_{socket_.get_executor(), 1}; + +public: + line_based_echo_session(tcp::socket socket) + : socket_{std::move(socket)} + { + socket_.set_option(tcp::no_delay(true)); + } + + void start() + { + co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->handle_messages(); }, + detached); + + co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->send_heartbeats(); }, + detached); + } + +private: + void stop() + { + socket_.close(); + write_lock_.cancel(); + } + + awaitable handle_messages() + { + try + { + constexpr std::size_t max_line_length = 1024; + std::string data; + for (;;) + { + // Read an entire line from the client. + std::size_t length = co_await async_read_until(socket_, + dynamic_buffer(data, max_line_length), '\n', deferred); + + // Claim the write lock by sending a message to the channel. Since the + // channel signature is void(), there are no arguments to send in the + // message itself. + co_await write_lock_.async_send(deferred); + + // Respond to the client with a message, echoing the line they sent. + co_await async_write(socket_, ""_buf, deferred); + co_await async_write(socket_, dynamic_buffer(data, length), deferred); + + // Release the lock by receiving the message back again. + write_lock_.try_receive([](auto...){}); + } + } + catch (const std::exception&) + { + stop(); + } + } + + awaitable send_heartbeats() + { + steady_timer timer{socket_.get_executor()}; + try + { + for (;;) + { + // Wait one second before trying to send the next heartbeat. + timer.expires_after(1s); + co_await timer.async_wait(deferred); + + // Claim the write lock by sending a message to the channel. Since the + // channel signature is void(), there are no arguments to send in the + // message itself. + co_await write_lock_.async_send(deferred); + + // Send a heartbeat to the client. As the content of the heartbeat + // message never varies, a buffer literal can be used to specify the + // bytes of the message. The memory associated with a buffer literal is + // valid for the lifetime of the program, which mean that the buffer + // can be safely passed as-is to the asynchronous operation. + co_await async_write(socket_, "\n"_buf, deferred); + + // Release the lock by receiving the message back again. + write_lock_.try_receive([](auto...){}); + } + } + catch (const std::exception&) + { + stop(); + } + } + +}; + +awaitable listen(tcp::acceptor& acceptor) +{ + for (;;) + { + auto [e, socket] = co_await acceptor.async_accept(as_tuple(deferred)); + if (!e) + { + std::make_shared(std::move(socket))->start(); + } + } +} + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: mutual_exclusion_1"; + std::cerr << " \n"; + return 1; + } + + io_context ctx; + + auto listen_endpoint = + *tcp::resolver(ctx).resolve(argv[1], argv[2], + tcp::resolver::passive).begin(); + + tcp::acceptor acceptor(ctx, listen_endpoint); + co_spawn(ctx, listen(acceptor), detached); + ctx.run(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} diff --git a/example/cpp20/channels/mutual_exclusion_2.cpp b/example/cpp20/channels/mutual_exclusion_2.cpp new file mode 100644 index 0000000000..bb4b7e8ce8 --- /dev/null +++ b/example/cpp20/channels/mutual_exclusion_2.cpp @@ -0,0 +1,192 @@ +// +// mutual_exclusion_1.cpp +// ~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include + +using boost::asio::as_tuple; +using boost::asio::awaitable; +using boost::asio::dynamic_buffer; +using boost::asio::co_spawn; +using boost::asio::deferred; +using boost::asio::detached; +using boost::asio::experimental::channel; +using boost::asio::io_context; +using boost::asio::ip::tcp; +using boost::asio::steady_timer; +using namespace boost::asio::buffer_literals; +using namespace std::literals::chrono_literals; + +// This class implements a simple line-based protocol: +// +// * For event line that is received from the client, the session sends a +// message header followed by the content of the line as the message body. +// +// * The session generates heartbeat messages once a second. +// +// This protocol is implemented using two actors, handle_messages() and +// send_heartbeats(), each written as a coroutine. +class line_based_echo_session : + public std::enable_shared_from_this +{ + // The socket used to read from and write to the client. This socket is a + // data member as it is shared between the two actors. + tcp::socket socket_; + + // As both of the actors will write to the socket, we need a lock to prevent + // these writes from overlapping. To achieve this, we use a channel with a + // buffer size of one. The lock is claimed by sending a message to the + // channel, and then released by receiving this message back again. If the + // lock is not held then the channel's buffer is empty, and the send will + // complete without delay. Otherwise, if the lock is held by the other actor, + // then the send operation will not complete until the lock is released. + channel write_lock_{socket_.get_executor(), 1}; + +public: + line_based_echo_session(tcp::socket socket) + : socket_{std::move(socket)} + { + socket_.set_option(tcp::no_delay(true)); + } + + void start() + { + co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->handle_messages(); }, + detached); + + co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->send_heartbeats(); }, + detached); + } + +private: + void stop() + { + socket_.close(); + write_lock_.cancel(); + } + + awaitable handle_messages() + { + try + { + constexpr std::size_t max_line_length = 1024; + std::string data; + for (;;) + { + // Read an entire line from the client. + std::size_t length = co_await async_read_until(socket_, + dynamic_buffer(data, max_line_length), '\n', deferred); + + // Claim the write lock by sending a message to the channel. Since the + // channel signature is void(), there are no arguments to send in the + // message itself. In this example we optimise for the common case, + // where the lock is not held by the other actor, by first trying a + // non-blocking send. + if (!write_lock_.try_send()) + { + co_await write_lock_.async_send(deferred); + } + + // Respond to the client with a message, echoing the line they sent. + co_await async_write(socket_, ""_buf, deferred); + co_await async_write(socket_, dynamic_buffer(data, length), deferred); + + // Release the lock by receiving the message back again. + write_lock_.try_receive([](auto...){}); + } + } + catch (const std::exception&) + { + stop(); + } + } + + awaitable send_heartbeats() + { + steady_timer timer{socket_.get_executor()}; + try + { + for (;;) + { + // Wait one second before trying to send the next heartbeat. + timer.expires_after(1s); + co_await timer.async_wait(deferred); + + // Claim the write lock by sending a message to the channel. Since the + // channel signature is void(), there are no arguments to send in the + // message itself. In this example we optimise for the common case, + // where the lock is not held by the other actor, by first trying a + // non-blocking send. + if (!write_lock_.try_send()) + { + co_await write_lock_.async_send(deferred); + } + + // Send a heartbeat to the client. As the content of the heartbeat + // message never varies, a buffer literal can be used to specify the + // bytes of the message. The memory associated with a buffer literal is + // valid for the lifetime of the program, which mean that the buffer + // can be safely passed as-is to the asynchronous operation. + co_await async_write(socket_, "\n"_buf, deferred); + + // Release the lock by receiving the message back again. + write_lock_.try_receive([](auto...){}); + } + } + catch (const std::exception&) + { + stop(); + } + } + +}; + +awaitable listen(tcp::acceptor& acceptor) +{ + for (;;) + { + auto [e, socket] = co_await acceptor.async_accept(as_tuple(deferred)); + if (!e) + { + std::make_shared(std::move(socket))->start(); + } + } +} + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: mutual_exclusion_1"; + std::cerr << " \n"; + return 1; + } + + io_context ctx; + + auto listen_endpoint = + *tcp::resolver(ctx).resolve(argv[1], argv[2], + tcp::resolver::passive).begin(); + + tcp::acceptor acceptor(ctx, listen_endpoint); + co_spawn(ctx, listen(acceptor), detached); + ctx.run(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +}