Skip to content

Commit

Permalink
Add try_send_via_dispatch/try_send_n_via_dispatch functions to channels.
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskohlhoff committed Nov 1, 2023
1 parent dffcedc commit 08b8c3c
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 33 deletions.
24 changes: 24 additions & 0 deletions include/boost/asio/experimental/basic_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,37 @@ class basic_channel
template <typename... Args>
bool try_send(Args&&... args);

/// Try to send a message without blocking, using dispatch semantics to call
/// the receive operation's completion handler.
/**
* Fails if the buffer is full and there are no waiting receive operations.
*
* The receive operation's completion handler may be called from inside this
* function.
*
* @returns @c true on success, @c false on failure.
*/
template <typename... Args>
bool try_send_via_dispatch(Args&&... args);

/// Try to send a number of messages without blocking.
/**
* @returns The number of messages that were sent.
*/
template <typename... Args>
std::size_t try_send_n(std::size_t count, Args&&... args);

/// Try to send a number of messages without blocking, using dispatch
/// semantics to call the receive operations' completion handlers.
/**
* The receive operations' completion handlers may be called from inside this
* function.
*
* @returns The number of messages that were sent.
*/
template <typename... Args>
std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);

/// Asynchronously send a message.
/**
* @par Completion Signature
Expand Down
24 changes: 24 additions & 0 deletions include/boost/asio/experimental/basic_concurrent_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,37 @@ class basic_concurrent_channel
template <typename... Args>
bool try_send(Args&&... args);

/// Try to send a message without blocking, using dispatch semantics to call
/// the receive operation's completion handler.
/**
* Fails if the buffer is full and there are no waiting receive operations.
*
* The receive operation's completion handler may be called from inside this
* function.
*
* @returns @c true on success, @c false on failure.
*/
template <typename... Args>
bool try_send_via_dispatch(Args&&... args);

/// Try to send a number of messages without blocking.
/**
* @returns The number of messages that were sent.
*/
template <typename... Args>
std::size_t try_send_n(std::size_t count, Args&&... args);

/// Try to send a number of messages without blocking, using dispatch
/// semantics to call the receive operations' completion handlers.
/**
* The receive operations' completion handlers may be called from inside this
* function.
*
* @returns The number of messages that were sent.
*/
template <typename... Args>
std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);

/// Asynchronously send a message.
template <typename... Args,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code))
Expand Down
44 changes: 39 additions & 5 deletions include/boost/asio/experimental/detail/channel_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ class channel_operation BOOST_ASIO_INHERIT_TRACKED_HANDLER
{
destroy_op = 0,
immediate_op = 1,
complete_op = 2,
cancel_op = 3,
close_op = 4
post_op = 2,
dispatch_op = 3,
cancel_op = 4,
close_op = 5
};

typedef void (*func_type)(channel_operation*, action, void*);
Expand Down Expand Up @@ -116,6 +117,17 @@ class channel_operation::handler_work_base
).execute(static_cast<Function&&>(function));
}

template <typename Function, typename Handler>
void dispatch(Function& function, Handler& handler)
{
associated_allocator_t<Handler> allocator =
(get_associated_allocator)(handler);

boost::asio::prefer(executor_,
execution::allocator(allocator)
).execute(static_cast<Function&&>(function));
}

private:
executor_type executor_;
};
Expand Down Expand Up @@ -151,6 +163,16 @@ class channel_operation::handler_work_base<Executor,
static_cast<Function&&>(function), allocator);
}

template <typename Function, typename Handler>
void dispatch(Function& function, Handler& handler)
{
associated_allocator_t<Handler> allocator =
(get_associated_allocator)(handler);

work_.get_executor().dispatch(
static_cast<Function&&>(function), allocator);
}

private:
executor_work_guard<Executor> work_;
};
Expand All @@ -177,11 +199,17 @@ class channel_operation::handler_work :
}

template <typename Function>
void complete(Function& function, Handler& handler)
void post(Function& function, Handler& handler)
{
base2_type::post(function, handler);
}

template <typename Function>
void dispatch(Function& function, Handler& handler)
{
base2_type::dispatch(function, handler);
}

template <typename Function>
void immediate(Function& function, Handler& handler, ...)
{
Expand Down Expand Up @@ -234,11 +262,17 @@ class channel_operation::handler_work<
}

template <typename Function>
void complete(Function& function, Handler& handler)
void post(Function& function, Handler& handler)
{
base1_type::post(function, handler);
}

template <typename Function>
void dispatch(Function& function, Handler& handler)
{
base1_type::dispatch(function, handler);
}

template <typename Function>
void immediate(Function& function, Handler& handler, ...)
{
Expand Down
13 changes: 10 additions & 3 deletions include/boost/asio/experimental/detail/channel_receive_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ class channel_receive : public channel_operation
func_(this, immediate_op, &payload);
}

void complete(Payload payload)
void post(Payload payload)
{
func_(this, complete_op, &payload);
func_(this, post_op, &payload);
}

void dispatch(Payload payload)
{
func_(this, dispatch_op, &payload);
}

protected:
Expand Down Expand Up @@ -95,8 +100,10 @@ class channel_receive_op : public channel_receive<Payload>
BOOST_ASIO_HANDLER_INVOCATION_BEGIN(());
if (a == channel_operation::immediate_op)
w.immediate(handler, handler.handler_, 0);
else if (a == channel_operation::dispatch_op)
w.dispatch(handler, handler.handler_);
else
w.complete(handler, handler.handler_);
w.post(handler, handler.handler_);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
else
Expand Down
56 changes: 52 additions & 4 deletions include/boost/asio/experimental/detail/channel_send_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ class channel_send_functions<Derived, Executor, R(Args...)>
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send<message_type>(
self->impl_, static_cast<Args2&&>(args)...);
self->impl_, false, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
enable_if_t<
is_constructible<detail::channel_message<R(Args...)>, int, Args2...>::value,
bool
> try_send_via_dispatch(Args2&&... args)
{
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send<message_type>(
self->impl_, true, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
Expand All @@ -56,7 +68,19 @@ class channel_send_functions<Derived, Executor, R(Args...)>
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send_n<message_type>(
self->impl_, count, static_cast<Args2&&>(args)...);
self->impl_, count, false, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
enable_if_t<
is_constructible<detail::channel_message<R(Args...)>, int, Args2...>::value,
std::size_t
> try_send_n_via_dispatch(std::size_t count, Args2&&... args)
{
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send_n<message_type>(
self->impl_, count, true, static_cast<Args2&&>(args)...);
}

template <
Expand Down Expand Up @@ -99,7 +123,19 @@ class channel_send_functions<Derived, Executor, R(Args...), Signatures...> :
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send<message_type>(
self->impl_, static_cast<Args2&&>(args)...);
self->impl_, false, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
enable_if_t<
is_constructible<detail::channel_message<R(Args...)>, int, Args2...>::value,
bool
> try_send_via_dispatch(Args2&&... args)
{
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send<message_type>(
self->impl_, true, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
Expand All @@ -111,7 +147,19 @@ class channel_send_functions<Derived, Executor, R(Args...), Signatures...> :
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send_n<message_type>(
self->impl_, count, static_cast<Args2&&>(args)...);
self->impl_, count, false, static_cast<Args2&&>(args)...);
}

template <typename... Args2>
enable_if_t<
is_constructible<detail::channel_message<R(Args...)>, int, Args2...>::value,
std::size_t
> try_send_n_via_dispatch(std::size_t count, Args2&&... args)
{
typedef typename detail::channel_message<R(Args...)> message_type;
Derived* self = static_cast<Derived*>(this);
return self->service_->template try_send_n<message_type>(
self->impl_, count, true, static_cast<Args2&&>(args)...);
}

template <
Expand Down
6 changes: 3 additions & 3 deletions include/boost/asio/experimental/detail/channel_send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class channel_send : public channel_operation
func_(this, immediate_op, 0);
}

void complete()
void post()
{
func_(this, complete_op, 0);
func_(this, post_op, 0);
}

void cancel()
Expand Down Expand Up @@ -130,7 +130,7 @@ class channel_send_op : public channel_send<Payload>
if (a == channel_operation::immediate_op)
w.immediate(handler, handler.handler_, 0);
else
w.complete(handler, handler.handler_);
w.post(handler, handler.handler_);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
}
Expand Down
10 changes: 5 additions & 5 deletions include/boost/asio/experimental/detail/channel_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ class channel_service
template <typename Message, typename Traits,
typename... Signatures, typename... Args>
bool try_send(implementation_type<Traits, Signatures...>& impl,
Args&&... args);
bool via_dispatch, Args&&... args);

// Synchronously send a number of new values into the channel.
template <typename Message, typename Traits,
typename... Signatures, typename... Args>
std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
std::size_t count, Args&&... args);
std::size_t count, bool via_dispatch, Args&&... args);

// Asynchronously send a new value into the channel.
template <typename Traits, typename... Signatures,
Expand Down Expand Up @@ -221,17 +221,17 @@ class channel_service
private:
// Helper function object to handle a closed notification.
template <typename Payload, typename Signature>
struct complete_receive
struct post_receive
{
explicit complete_receive(channel_receive<Payload>* op)
explicit post_receive(channel_receive<Payload>* op)
: op_(op)
{
}

template <typename... Args>
void operator()(Args&&... args)
{
op_->complete(
op_->post(
channel_message<Signature>(0,
static_cast<Args&&>(args)...));
}
Expand Down
Loading

0 comments on commit 08b8c3c

Please sign in to comment.