Skip to content

Commit

Permalink
Add examples showing how to use channels for mutual exclusion.
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskohlhoff committed Aug 1, 2023
1 parent 9009036 commit 2ca2752
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 0 deletions.
2 changes: 2 additions & 0 deletions doc/examples.qbk
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ Coroutines TS.

Example showing how to use a channel in conjunction with C++20 coroutines.

* [@boost_asio/example/cpp20/channels/mutual_exclusion_1.cpp]
* [@boost_asio/example/cpp20/channels/mutual_exclusion_2.cpp]
* [@boost_asio/example/cpp20/channels/throttling_proxy.cpp]


Expand Down
34 changes: 34 additions & 0 deletions example/cpp20/channels/Jamfile.v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#

lib socket ; # SOLARIS
lib nsl ; # SOLARIS
lib ws2_32 ; # NT
lib mswsock ; # NT
lib ipv6 ; # HPUX
lib network ; # HAIKU

project
: requirements
<library>/boost/system//boost_system
<library>/boost/chrono//boost_chrono
<define>BOOST_ALL_NO_LIB=1
<threading>multi
<target-os>solaris:<library>socket
<target-os>solaris:<library>nsl
<target-os>windows:<define>_WIN32_WINNT=0x0501
<target-os>windows,<toolset>gcc:<library>ws2_32
<target-os>windows,<toolset>gcc:<library>mswsock
<target-os>windows,<toolset>gcc-cygwin:<define>__USE_W32_SOCKETS
<target-os>hpux,<toolset>gcc:<define>_XOPEN_SOURCE_EXTENDED
<target-os>hpux:<library>ipv6
<target-os>haiku:<library>network
;

exe mutual_exclusion_1 : mutual_exclusion_1.cpp ;
exe mutual_exclusion_2 : mutual_exclusion_2.cpp ;
exe throttling_proxy : throttling_proxy.cpp ;
182 changes: 182 additions & 0 deletions example/cpp20/channels/mutual_exclusion_1.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//
// mutual_exclusion_1.cpp
// ~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <iostream>
#include <memory>

using boost::asio::as_tuple;
using boost::asio::awaitable;
using boost::asio::dynamic_buffer;
using boost::asio::co_spawn;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::ip::tcp;
using boost::asio::steady_timer;
using namespace boost::asio::buffer_literals;
using namespace std::literals::chrono_literals;

// This class implements a simple line-based protocol:
//
// * For event line that is received from the client, the session sends a
// message header followed by the content of the line as the message body.
//
// * The session generates heartbeat messages once a second.
//
// This protocol is implemented using two actors, handle_messages() and
// send_heartbeats(), each written as a coroutine.
class line_based_echo_session :
public std::enable_shared_from_this<line_based_echo_session>
{
// The socket used to read from and write to the client. This socket is a
// data member as it is shared between the two actors.
tcp::socket socket_;

// As both of the actors will write to the socket, we need a lock to prevent
// these writes from overlapping. To achieve this, we use a channel with a
// buffer size of one. The lock is claimed by sending a message to the
// channel, and then released by receiving this message back again. If the
// lock is not held then the channel's buffer is empty, and the send will
// complete without delay. Otherwise, if the lock is held by the other actor,
// then the send operation will not complete until the lock is released.
channel<void()> write_lock_{socket_.get_executor(), 1};

public:
line_based_echo_session(tcp::socket socket)
: socket_{std::move(socket)}
{
socket_.set_option(tcp::no_delay(true));
}

void start()
{
co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->handle_messages(); },
detached);

co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->send_heartbeats(); },
detached);
}

private:
void stop()
{
socket_.close();
write_lock_.cancel();
}

awaitable<void> handle_messages()
{
try
{
constexpr std::size_t max_line_length = 1024;
std::string data;
for (;;)
{
// Read an entire line from the client.
std::size_t length = co_await async_read_until(socket_,
dynamic_buffer(data, max_line_length), '\n', deferred);

// Claim the write lock by sending a message to the channel. Since the
// channel signature is void(), there are no arguments to send in the
// message itself.
co_await write_lock_.async_send(deferred);

// Respond to the client with a message, echoing the line they sent.
co_await async_write(socket_, "<line>"_buf, deferred);
co_await async_write(socket_, dynamic_buffer(data, length), deferred);

// Release the lock by receiving the message back again.
write_lock_.try_receive([](auto...){});
}
}
catch (const std::exception&)
{
stop();
}
}

awaitable<void> send_heartbeats()
{
steady_timer timer{socket_.get_executor()};
try
{
for (;;)
{
// Wait one second before trying to send the next heartbeat.
timer.expires_after(1s);
co_await timer.async_wait(deferred);

// Claim the write lock by sending a message to the channel. Since the
// channel signature is void(), there are no arguments to send in the
// message itself.
co_await write_lock_.async_send(deferred);

// Send a heartbeat to the client. As the content of the heartbeat
// message never varies, a buffer literal can be used to specify the
// bytes of the message. The memory associated with a buffer literal is
// valid for the lifetime of the program, which mean that the buffer
// can be safely passed as-is to the asynchronous operation.
co_await async_write(socket_, "<heartbeat>\n"_buf, deferred);

// Release the lock by receiving the message back again.
write_lock_.try_receive([](auto...){});
}
}
catch (const std::exception&)
{
stop();
}
}

};

awaitable<void> listen(tcp::acceptor& acceptor)
{
for (;;)
{
auto [e, socket] = co_await acceptor.async_accept(as_tuple(deferred));
if (!e)
{
std::make_shared<line_based_echo_session>(std::move(socket))->start();
}
}
}

int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: mutual_exclusion_1";
std::cerr << " <listen_address> <listen_port>\n";
return 1;
}

io_context ctx;

auto listen_endpoint =
*tcp::resolver(ctx).resolve(argv[1], argv[2],
tcp::resolver::passive).begin();

tcp::acceptor acceptor(ctx, listen_endpoint);
co_spawn(ctx, listen(acceptor), detached);
ctx.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
Loading

0 comments on commit 2ca2752

Please sign in to comment.