diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index f202ad29..3ac72b0c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -43,11 +43,21 @@ namespace { +struct ClientDataWrapper +{ + explicit ClientDataWrapper(std::shared_ptr data) + : client_data(data) + { + } + + std::shared_ptr client_data; +}; + ///============================================================================= void client_data_handler(z_loaned_reply_t * reply, void * data) { - auto client_data = static_cast(data); - if (client_data == nullptr) { + auto wrapper = static_cast(data); + if (wrapper == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain client_data_t from data in client_data_handler." @@ -55,7 +65,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) return; } - if (client_data->is_shutdown()) { + if (wrapper->client_data->is_shutdown()) { return; } @@ -69,7 +79,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - client_data->topic_info().topic_keyexpr_.c_str(), + wrapper->client_data->topic_info().topic_keyexpr_.c_str(), static_cast(z_string_len(z_loan(err_str))), z_string_data(z_loan(err_str))); z_drop(z_move(err_str)); @@ -80,15 +90,15 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) std::chrono::nanoseconds::rep received_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); - client_data->add_new_reply( + wrapper->client_data->add_new_reply( std::make_unique(reply, received_timestamp)); } ///============================================================================= void client_data_drop(void * data) { - auto client_data = static_cast(data); - if (client_data == nullptr) { + auto wrapper = static_cast(data); + if (wrapper == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to obtain client_data_t from data in client_data_drop." @@ -96,7 +106,7 @@ void client_data_drop(void * data) return; } - client_data->decrement_in_flight_and_conditionally_remove(); + delete wrapper; } } // namespace @@ -228,8 +238,7 @@ ClientData::ClientData( wait_set_data_(nullptr), sequence_number_(1), is_shutdown_(false), - initialized_(false), - num_in_flight_(0) + initialized_(false) { // Do nothing. } @@ -470,9 +479,9 @@ rmw_ret_t ClientData::send_request( // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. - num_in_flight_++; + ClientDataWrapper * wrapper = new ClientDataWrapper(shared_from_this()); z_owned_closure_reply_t zn_closure_reply; - z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); + z_closure(&zn_closure_reply, client_data_handler, client_data_drop, wrapper); z_get( sess_->loan(), z_loan(keyexpr_), "", @@ -527,10 +536,11 @@ bool ClientData::detach_condition_and_queue_is_empty() } ///============================================================================= -void ClientData::_shutdown() +rmw_ret_t ClientData::shutdown() { + std::lock_guard lock(mutex_); if (is_shutdown_) { - return; + return RMW_RET_OK; } // Unregister this node from the ROS graph. @@ -541,45 +551,10 @@ void ClientData::_shutdown() sess_.reset(); is_shutdown_ = true; -} -///============================================================================= -rmw_ret_t ClientData::shutdown() -{ - std::lock_guard lock(mutex_); - _shutdown(); return RMW_RET_OK; } -///============================================================================= -bool ClientData::shutdown_and_query_in_flight() -{ - std::lock_guard lock(mutex_); - _shutdown(); - return num_in_flight_ > 0; -} - -///============================================================================= -void ClientData::decrement_in_flight_and_conditionally_remove() -{ - std::unique_lock lock(mutex_); - --num_in_flight_; - - if (is_shutdown_ && num_in_flight_ == 0) { - rmw_context_impl_s * context_impl = static_cast(rmw_node_->data); - if (context_impl == nullptr) { - return; - } - std::shared_ptr node_data = context_impl->get_node_data(rmw_node_); - if (node_data == nullptr) { - return; - } - // We have to unlock here since we are about to delete ourself, and thus the unlock would be UB. - lock.unlock(); - node_data->delete_client_data(rmw_client_); - } -} - ///============================================================================= bool ClientData::is_shutdown() const { diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index 349a26db..f582f539 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -95,12 +95,6 @@ class ClientData final : public std::enable_shared_from_this // Shutdown this ClientData. rmw_ret_t shutdown(); - // Shutdown this ClientData, and return whether there are any requests currently in flight. - bool shutdown_and_query_in_flight(); - - // Decrement the in flight requests, and if that drops to 0 remove the client from the node. - void decrement_in_flight_and_conditionally_remove(); - // Check if this ClientData is shutdown. bool is_shutdown() const; @@ -122,9 +116,6 @@ class ClientData final : public std::enable_shared_from_this // Initialize the Zenoh objects for this entity. bool init(std::shared_ptr session); - // Shutdown this client (the mutex is expected to be held by the caller). - void _shutdown(); - // Internal mutex. mutable std::recursive_mutex mutex_; // The parent node. @@ -155,7 +146,6 @@ class ClientData final : public std::enable_shared_from_this bool is_shutdown_; // Whether the object has ever successfully been initialized. bool initialized_; - size_t num_in_flight_; }; using ClientDataPtr = std::shared_ptr; using ClientDataConstPtr = std::shared_ptr; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 1255de21..52a822fb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -387,9 +387,7 @@ void NodeData::delete_client_data(const rmw_client_t * const client) if (client_it == clients_.end()) { return; } - if (!client_it->second->shutdown_and_query_in_flight()) { - clients_.erase(client); - } + clients_.erase(client); } ///=============================================================================