Skip to content

Commit

Permalink
Merge branch 'ros2:rolling' into rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevenpmwork authored Dec 11, 2024
2 parents 3bf7153 + ca5058c commit a50350c
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 121 deletions.
88 changes: 34 additions & 54 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,30 @@

namespace
{
///=============================================================================
struct ClientDataWrapper
{
explicit ClientDataWrapper(std::shared_ptr<rmw_zenoh_cpp::ClientData> data)
: client_data(std::move(data))
{
}

std::shared_ptr<rmw_zenoh_cpp::ClientData> client_data;
};

///=============================================================================
void client_data_handler(z_loaned_reply_t * reply, void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_handler."
);
return;
}

if (client_data->is_shutdown()) {
if (wrapper->client_data->is_shutdown()) {
return;
}

Expand All @@ -69,7 +79,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
client_data->topic_info().topic_keyexpr_.c_str(),
wrapper->client_data->topic_info().topic_keyexpr_.c_str(),
static_cast<int>(z_string_len(z_loan(err_str))),
z_string_data(z_loan(err_str)));
z_drop(z_move(err_str));
Expand All @@ -80,23 +90,23 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
std::chrono::nanoseconds::rep received_timestamp =
std::chrono::system_clock::now().time_since_epoch().count();

client_data->add_new_reply(
wrapper->client_data->add_new_reply(
std::make_unique<rmw_zenoh_cpp::ZenohReply>(reply, received_timestamp));
}

///=============================================================================
void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_zenoh_cpp::ClientData *>(data);
if (client_data == nullptr) {
auto wrapper = static_cast<ClientDataWrapper *>(data);
if (wrapper == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t from data in client_data_drop."
);
return;
}

client_data->decrement_in_flight_and_conditionally_remove();
delete wrapper;
}

} // namespace
Expand All @@ -105,7 +115,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -167,7 +177,7 @@ std::shared_ptr<ClientData> ClientData::make(

std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
z_info_zid(session),
z_info_zid(session->loan()),
std::to_string(node_id),
std::to_string(service_id),
liveliness::EntityType::Client,
Expand All @@ -192,6 +202,7 @@ std::shared_ptr<ClientData> ClientData::make(
node,
client,
entity,
session,
request_members,
response_members,
request_type_support,
Expand All @@ -211,28 +222,29 @@ ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_client_(rmw_client),
entity_(std::move(entity)),
sess_(std::move(sess)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(request_type_support),
response_type_support_(response_type_support),
wait_set_data_(nullptr),
sequence_number_(1),
is_shutdown_(false),
initialized_(false),
num_in_flight_(0)
initialized_(false)
{
// Do nothing.
}

///=============================================================================
bool ClientData::init(const z_loaned_session_t * session)
bool ClientData::init(std::shared_ptr<ZenohSession> session)
{
if (z_keyexpr_from_str(
&this->keyexpr_,
Expand All @@ -250,7 +262,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
if (z_liveliness_declare_token(
session,
session->loan(),
&this->token_,
z_loan(liveliness_ke),
NULL
Expand All @@ -266,6 +278,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_drop(z_move(this->token_));
});

sess_ = session;
initialized_ = true;

free_ros_keyexpr.cancel();
Expand Down Expand Up @@ -466,11 +479,11 @@ rmw_ret_t ClientData::send_request(

// TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures,
// capture shared_from_this() instead of this.
num_in_flight_++;
ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this());
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper);
z_get(
context_impl->session(),
sess_->loan(),
z_loan(keyexpr_), "",
z_move(zn_closure_reply),
&opts);
Expand Down Expand Up @@ -523,10 +536,11 @@ bool ClientData::detach_condition_and_queue_is_empty()
}

///=============================================================================
void ClientData::_shutdown()
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (is_shutdown_) {
return;
return RMW_RET_OK;
}

// Unregister this node from the ROS graph.
Expand All @@ -535,46 +549,12 @@ void ClientData::_shutdown()
z_drop(z_move(keyexpr_));
}

sess_.reset();
is_shutdown_ = true;
}

///=============================================================================
rmw_ret_t ClientData::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return RMW_RET_OK;
}

///=============================================================================
bool ClientData::shutdown_and_query_in_flight()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
_shutdown();
return num_in_flight_ > 0;
}

///=============================================================================
void ClientData::decrement_in_flight_and_conditionally_remove()
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
--num_in_flight_;

if (is_shutdown_ && num_in_flight_ == 0) {
rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
if (context_impl == nullptr) {
return;
}
std::shared_ptr<rmw_zenoh_cpp::NodeData> node_data = context_impl->get_node_data(rmw_node_);
if (node_data == nullptr) {
return;
}
// We have to unlock here since we are about to delete ourself, and thus the unlock would be UB.
lock.unlock();
node_data->delete_client_data(rmw_client_);
}
}

///=============================================================================
bool ClientData::is_shutdown() const
{
Expand Down
17 changes: 5 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr<ClientData> make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -95,12 +95,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
// Shutdown this ClientData.
rmw_ret_t shutdown();

// Shutdown this ClientData, and return whether there are any requests currently in flight.
bool shutdown_and_query_in_flight();

// Decrement the in flight requests, and if that drops to 0 remove the client from the node.
void decrement_in_flight_and_conditionally_remove();

// Check if this ClientData is shutdown.
bool is_shutdown() const;

Expand All @@ -113,16 +107,14 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const z_loaned_session_t * session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
bool init(std::shared_ptr<ZenohSession> session);

// Internal mutex.
mutable std::recursive_mutex mutex_;
Expand All @@ -131,6 +123,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<ZenohSession> sess_;
// An owned keyexpression.
z_owned_keyexpr_t keyexpr_;
// Liveliness token for the service.
Expand All @@ -152,7 +146,6 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
bool is_shutdown_;
// Whether the object has ever successfully been initialized.
bool initialized_;
size_t num_in_flight_;
};
using ClientDataPtr = std::shared_ptr<ClientData>;
using ClientDataConstPtr = std::shared_ptr<const ClientData>;
Expand Down
Loading

0 comments on commit a50350c

Please sign in to comment.