From 5be71990418fc2eddda480f03543226f3df8b97a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 16 Nov 2023 02:07:56 +0800 Subject: [PATCH] Implement basic publisher introspection Signed-off-by: Yadunund --- rmw_zenoh_cpp/package.xml | 1 - rmw_zenoh_cpp/src/detail/graph_cache.cpp | 139 ++++++++++++++++-- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 15 +- .../src/rmw_get_topic_names_and_types.cpp | 110 +++++++------- 4 files changed, 198 insertions(+), 67 deletions(-) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 259bbce6..02abec46 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,7 +19,6 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw - yaml_cpp_vendor ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 99403ec8..fde32033 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -256,14 +256,16 @@ GraphCache::remove_subscription(uint64_t handle) ///============================================================================= namespace { -std::vector split_keyexpr(const std::string & keyexpr) +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') { std::vector delim_idx = {}; // Insert -1 for starting position to make the split easier when using substr. delim_idx.push_back(-1); std::size_t idx = 0; for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == '/') { + if (*it == delim) { delim_idx.push_back(idx); } ++idx; @@ -319,7 +321,6 @@ std::optional> _parse_token(const std::string return std::nullopt; } - GraphNode node; // Nodes with empty namespaces will contain a "_". node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3); @@ -333,7 +334,7 @@ std::optional> _parse_token(const std::string return std::nullopt; } GraphNode::PubSubData data; - data.topic = std::move(parts[5]); + data.topic = "/" + std::move(parts[5]); data.type = std::move(parts[6]); data.qos = std::move(parts[7]); @@ -376,12 +377,12 @@ void GraphCache::parse_put(const std::string & keyexpr) auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); if (!insertion.second) { RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.", + "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.", node->name.c_str()); } } RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added node %s to the graph.", + "rmw_zenoh_cpp", "Added node /%s to the graph.", node->name.c_str()); return; @@ -400,11 +401,20 @@ void GraphCache::parse_put(const std::string & keyexpr) auto insertion = ns_it->second.insert(std::make_pair(node->name, node)); if (!insertion.second && !node->pubs.empty()) { // Node already exists so just append the publisher. - insertion.first->second->pubs.push_back(std::move(node->pubs[0])); + insertion.first->second->pubs.push_back(node->pubs[0]); } } + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), 1)); + if (!insertion.second) { + // Such a topic already exists so we just increment its count. + ++insertion.first->second; + } RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", "Added publisher to node %s in graph.", + "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.", + node->pubs.at(0).topic.c_str(), node->name.c_str()); return; } else if (entity == "MS") { @@ -440,7 +450,7 @@ void GraphCache::parse_del(const std::string & keyexpr) } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removed node %s from the graph.", + "Removed node /%s from the graph.", node->name.c_str() ); } else if (entity == "MP") { @@ -469,9 +479,22 @@ void GraphCache::parse_del(const std::string & keyexpr) } if (erase_it != found_node->pubs.end()) { found_node->pubs.erase(erase_it); + // Bookkeeping + // TODO(Yadunund): Be more systematic about generating the key. + std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type; + auto topic_it = graph_topics_.find(topic_key); + if (topic_it != graph_topics_.end()) { + if (topic_it->second == 1) { + // The last publisher was removed so we can delete this entry. + graph_topics_.erase(topic_key); + } else { + // Else we just decrement the count. + --topic_it->second; + } + } RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Removed publisher %s from node %s in the graph.", + "Removed publisher %s from node /%s in the graph.", node->pubs.at(0).topic.c_str(), node->name.c_str() ); @@ -575,9 +598,6 @@ rmw_ret_t GraphCache::get_node_names( std::move(free_enclaves_lambda)); } - // TODO(Yadunund): Remove this printout. - // const std::string & graph_str = YAML::Dump(graph_); - // RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str()); // Fill node names, namespaces and enclaves. std::size_t j = 0; for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) { @@ -612,3 +632,96 @@ rmw_ret_t GraphCache::get_node_names( return RMW_RET_OK; } + +///============================================================================= +rmw_ret_t GraphCache::get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + static_cast(no_demangle); + RCUTILS_CHECK_ALLOCATOR_WITH_MSG( + allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT); + + std::lock_guard lock(graph_mutex_); + const size_t topic_number = graph_topics_.size(); + // TODO(Yadunund): Delete. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "topic_number %ld", topic_number); + + rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + auto cleanup_names_and_types = rcpputils::make_scope_exit( + [topic_names_and_types] { + rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + if (fail_ret != RMW_RET_OK) { + RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + } + }); + + // topic_names_and_types->names is an rcutils_string_array_t, + // while topic_names_and_types->types is an rcutils_string_array_t * + + // rcutils_ret_t rcutils_ret = + // rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator); + // if (rcutils_ret != RCUTILS_RET_OK) { + // return RMW_RET_BAD_ALLOC; + // } + auto free_topic_names = rcpputils::make_scope_exit( + [names = &topic_names_and_types->names]() { + rcutils_ret_t ret = rcutils_string_array_fini(names); + if (ret != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }); + + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + return RMW_RET_BAD_ALLOC; + } + auto free_topic_types = rcpputils::make_scope_exit( + [types = topic_names_and_types->types]() { + rcutils_ret_t ret = rcutils_string_array_fini(types); + if (ret != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string().str); + } + }); + + // Fill topic names and types. + std::size_t j = 0; + for (const auto & it : graph_topics_) { + // Split based on "?". + // TODO(Yadunund): Be more systematic about this. + std::vector parts = split_keyexpr(it.first, '?'); + if (parts.size() < 2) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Invalid topic_key %s", it.first.c_str()); + return RMW_RET_INVALID_ARGUMENT; + } + topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator); + if (!topic_names_and_types->names.data[j]) { + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types->data[j] = rcutils_strdup( + parts.at(1).c_str(), *allocator); + if (!topic_names_and_types->types->data[j]) { + return RMW_RET_BAD_ALLOC; + } + ++j; + } + + cleanup_names_and_types.cancel(); + free_topic_names.cancel(); + free_topic_types.cancel(); + + return RMW_RET_OK; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 0e0c4c14..78336704 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -21,14 +21,15 @@ #include #include #include +#include #include #include "rcutils/allocator.h" #include "rcutils/types.h" #include "rmw/rmw.h" +#include "rmw/names_and_types.h" -#include "yaml-cpp/yaml.h" ///============================================================================= class GenerateToken @@ -152,6 +153,11 @@ class GraphCache final rcutils_string_array_t * enclaves, rcutils_allocator_t * allocator) const; + rmw_ret_t get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types); + private: std::mutex publishers_mutex_; uint64_t publishers_handle_id_{0}; @@ -183,7 +189,12 @@ class GraphCache final node_n: */ // Map namespace to a map of . - std::unordered_map> graph_; + std::unordered_map> graph_ = {}; + // Optimize published topic lookups by caching an unordered_map + // where key is topic_name+topic_type and the value the count. + // TODO(Yadunund): Use a single map for both published and subscribed topics. + // Consider changing value to pair for pub,sub resp. + std::unordered_map graph_topics_ = {}; mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index 7e4f2645..a70b7cb4 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "detail/rmw_data_types.hpp" + #include "rcutils/strdup.h" #include "rmw/error_handling.h" @@ -31,63 +33,69 @@ rmw_get_topic_names_and_types( bool no_demangle, rmw_names_and_types_t * topic_names_and_types) { - static_cast(node); - static_cast(no_demangle); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); + + return node->context->impl->graph_cache.get_topic_names_and_types( + allocator, no_demangle, topic_names_and_types); - rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); - if (ret != RMW_RET_OK) { - return ret; - } - auto cleanup_names_and_types = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); - } - }); + // rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator); + // if (ret != RMW_RET_OK) { + // return ret; + // } + // auto cleanup_names_and_types = rcpputils::make_scope_exit( + // [topic_names_and_types] { + // rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types); + // if (fail_ret != RMW_RET_OK) { + // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling"); + // } + // }); - // topic_names_and_types->names is an rcutils_string_array_t, - // while topic_names_and_types->types is an rcutils_string_array_t * + // // topic_names_and_types->names is an rcutils_string_array_t, + // // while topic_names_and_types->types is an rcutils_string_array_t * - topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); - if (topic_names_and_types->names.data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); - return RMW_RET_BAD_ALLOC; - } - auto free_names = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); - }); + // topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator); + // if (topic_names_and_types->names.data[0] == nullptr) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic names"); + // return RMW_RET_BAD_ALLOC; + // } + // auto free_names = rcpputils::make_scope_exit( + // [topic_names_and_types, allocator] { + // allocator->deallocate(topic_names_and_types->names.data[0], allocator->state); + // }); - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - topic_names_and_types->types, 1, - allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); - return RMW_RET_BAD_ALLOC; - } - auto fini_string_array = rcpputils::make_scope_exit( - [topic_names_and_types] { - rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); - if (fail_ret != RMW_RET_OK) { - RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); - } - }); + // rcutils_ret_t rcutils_ret = rcutils_string_array_init( + // topic_names_and_types->types, 1, + // allocator); + // if (rcutils_ret != RCUTILS_RET_OK) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic types"); + // return RMW_RET_BAD_ALLOC; + // } + // auto fini_string_array = rcpputils::make_scope_exit( + // [topic_names_and_types] { + // rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types); + // if (fail_ret != RMW_RET_OK) { + // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling"); + // } + // }); - topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); - if (topic_names_and_types->types[0].data[0] == nullptr) { - RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); - return RMW_RET_BAD_ALLOC; - } - auto free_types = rcpputils::make_scope_exit( - [topic_names_and_types, allocator] { - allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); - }); + // topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator); + // if (topic_names_and_types->types[0].data[0] == nullptr) { + // RMW_SET_ERROR_MSG("failed to allocate memory for topic data"); + // return RMW_RET_BAD_ALLOC; + // } + // auto free_types = rcpputils::make_scope_exit( + // [topic_names_and_types, allocator] { + // allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state); + // }); - free_types.cancel(); - fini_string_array.cancel(); - free_names.cancel(); - cleanup_names_and_types.cancel(); + // free_types.cancel(); + // fini_string_array.cancel(); + // free_names.cancel(); + // cleanup_names_and_types.cancel(); return RMW_RET_OK; }