From 68b4d9a3d1a08fe00e1224a66fe8fa066138da6f Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 19 Dec 2024 02:12:19 -0500 Subject: [PATCH] Use weak pointers in rmw_context_impl_s. (#345) Signed-off-by: Chris Lalancette --- .../src/detail/rmw_context_impl_s.cpp | 59 +++++++------------ 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 206d2d3f..61dd9c8b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -43,17 +43,10 @@ // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -// This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler() -// to lookup the pointer, and gain a reference to a shared_ptr if it exists. -// This guarantees that the Data object will not be destroyed while we are using it. -static std::mutex data_to_data_shared_ptr_map_mutex; -static std::unordered_map> data_to_data_shared_ptr_map; - // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing // "this" ptr by reference. -class rmw_context_impl_s::Data final +class rmw_context_impl_s::Data final : public std::enable_shared_from_this { public: // Constructor. @@ -64,7 +57,8 @@ class rmw_context_impl_s::Data final enclave_(std::move(enclave)), is_shutdown_(false), next_entity_id_(0), - nodes_({}) + nodes_({}), + liveliness_keyexpr_(rmw_zenoh_cpp::liveliness::subscription_token(domain_id)) { // Initialize the zenoh configuration. std::optional config = rmw_zenoh_cpp::get_z_config( @@ -127,8 +121,6 @@ class rmw_context_impl_s::Data final graph_cache_ = std::make_shared(this->session_->get_zid()); // Setup liveliness subscriptions for discovery. - std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); - // Query router/liveliness participants to get graph information before the session was started. // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the z_liveliness_get() call. This is necessary as if the `bound` @@ -147,14 +139,12 @@ class rmw_context_impl_s::Data final // Intuitively use a FIFO channel rather than building it up with a closure and a // handler to FIFO channel - zenoh::KeyExpr keyexpr(liveliness_str); - zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default(); options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL; options.payload = ""; auto replies = session_->liveliness_get( - keyexpr, + liveliness_keyexpr_, zenoh::channels::FifoChannel(SIZE_MAX - 1), zenoh::Session::LivelinessGetOptions::create_default(), &result); @@ -193,29 +183,29 @@ class rmw_context_impl_s::Data final graph_guard_condition_ = std::make_unique(); graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; graph_guard_condition_->data = &guard_condition_data_; + } + void init() + { // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str()); zenoh::Session::LivelinessSubscriberOptions sub_options = zenoh::Session::LivelinessSubscriberOptions::create_default(); sub_options.history = true; - graph_subscriber_ = session_->liveliness_declare_subscriber( - keyexpr_cpp, - [&](const zenoh::Sample & sample) { - // Look up the data shared_ptr in the global map. If it is in there, use it. - // If not, it is being shutdown so we can just ignore this update. - std::shared_ptr data_shared_ptr{nullptr}; - { - std::lock_guard lk(data_to_data_shared_ptr_map_mutex); - if (data_to_data_shared_ptr_map.count(this) == 0) { - return; - } - data_shared_ptr = data_to_data_shared_ptr_map[this]; - } + // This can't be in the constructor since shared_from_this() doesn't work there. + std::weak_ptr context_impl_data_wp = shared_from_this(); + zenoh::ZResult result; + graph_subscriber_ = session_->liveliness_declare_subscriber( + liveliness_keyexpr_, + [context_impl_data_wp](const zenoh::Sample & sample) { // Update the graph cache. - data_shared_ptr->update_graph_cache( + std::shared_ptr context_impl_data = context_impl_data_wp.lock(); + if (context_impl_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to obtain context_impl."); + return; + } + context_impl_data->update_graph_cache( sample, std::string(sample.get_keyexpr().as_string_view())); }, @@ -435,6 +425,8 @@ class rmw_context_impl_s::Data final std::size_t next_entity_id_; // Nodes created from this context. std::unordered_map> nodes_; + + zenoh::KeyExpr liveliness_keyexpr_; }; ///============================================================================= @@ -443,9 +435,7 @@ rmw_context_impl_s::rmw_context_impl_s( const std::string & enclave) { data_ = std::make_shared(domain_id, std::move(enclave)); - - std::lock_guard lk(data_to_data_shared_ptr_map_mutex); - data_to_data_shared_ptr_map.emplace(data_.get(), data_); + data_->init(); } ///============================================================================= @@ -487,11 +477,6 @@ std::size_t rmw_context_impl_s::get_next_entity_id() ///============================================================================= rmw_ret_t rmw_context_impl_s::shutdown() { - { - std::lock_guard lk(data_to_data_shared_ptr_map_mutex); - data_to_data_shared_ptr_map.erase(data_.get()); - } - return data_->shutdown(); }