diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml
index 259bbce6..02abec46 100644
--- a/rmw_zenoh_cpp/package.xml
+++ b/rmw_zenoh_cpp/package.xml
@@ -19,7 +19,6 @@
rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp
rmw
- yaml_cpp_vendor
ament_lint_auto
ament_lint_common
diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp
index 99403ec8..fde32033 100644
--- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp
+++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp
@@ -256,14 +256,16 @@ GraphCache::remove_subscription(uint64_t handle)
///=============================================================================
namespace
{
-std::vector split_keyexpr(const std::string & keyexpr)
+std::vector split_keyexpr(
+ const std::string & keyexpr,
+ const char delim = '/')
{
std::vector delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (auto it = keyexpr.begin(); it != keyexpr.end(); ++it) {
- if (*it == '/') {
+ if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
@@ -319,7 +321,6 @@ std::optional> _parse_token(const std::string
return std::nullopt;
}
-
GraphNode node;
// Nodes with empty namespaces will contain a "_".
node.ns = parts.at(3) == "_" ? "/" : "/" + parts.at(3);
@@ -333,7 +334,7 @@ std::optional> _parse_token(const std::string
return std::nullopt;
}
GraphNode::PubSubData data;
- data.topic = std::move(parts[5]);
+ data.topic = "/" + std::move(parts[5]);
data.type = std::move(parts[6]);
data.qos = std::move(parts[7]);
@@ -376,12 +377,12 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second) {
RCUTILS_LOG_WARN_NAMED(
- "rmw_zenoh_cpp", "Unable to add duplicate node %s to the graph.",
+ "rmw_zenoh_cpp", "Unable to add duplicate node /%s to the graph.",
node->name.c_str());
}
}
RCUTILS_LOG_WARN_NAMED(
- "rmw_zenoh_cpp", "Added node %s to the graph.",
+ "rmw_zenoh_cpp", "Added node /%s to the graph.",
node->name.c_str());
return;
@@ -400,11 +401,20 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto insertion = ns_it->second.insert(std::make_pair(node->name, node));
if (!insertion.second && !node->pubs.empty()) {
// Node already exists so just append the publisher.
- insertion.first->second->pubs.push_back(std::move(node->pubs[0]));
+ insertion.first->second->pubs.push_back(node->pubs[0]);
}
}
+ // Bookkeeping
+ // TODO(Yadunund): Be more systematic about generating the key.
+ std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type;
+ auto insertion = graph_topics_.insert(std::make_pair(std::move(topic_key), 1));
+ if (!insertion.second) {
+ // Such a topic already exists so we just increment its count.
+ ++insertion.first->second;
+ }
RCUTILS_LOG_WARN_NAMED(
- "rmw_zenoh_cpp", "Added publisher to node %s in graph.",
+ "rmw_zenoh_cpp", "Added publisher %s to node /%s in graph.",
+ node->pubs.at(0).topic.c_str(),
node->name.c_str());
return;
} else if (entity == "MS") {
@@ -440,7 +450,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
- "Removed node %s from the graph.",
+ "Removed node /%s from the graph.",
node->name.c_str()
);
} else if (entity == "MP") {
@@ -469,9 +479,22 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
if (erase_it != found_node->pubs.end()) {
found_node->pubs.erase(erase_it);
+ // Bookkeeping
+ // TODO(Yadunund): Be more systematic about generating the key.
+ std::string topic_key = node->pubs.at(0).topic + "?" + node->pubs.at(0).type;
+ auto topic_it = graph_topics_.find(topic_key);
+ if (topic_it != graph_topics_.end()) {
+ if (topic_it->second == 1) {
+ // The last publisher was removed so we can delete this entry.
+ graph_topics_.erase(topic_key);
+ } else {
+ // Else we just decrement the count.
+ --topic_it->second;
+ }
+ }
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
- "Removed publisher %s from node %s in the graph.",
+ "Removed publisher %s from node /%s in the graph.",
node->pubs.at(0).topic.c_str(),
node->name.c_str()
);
@@ -575,9 +598,6 @@ rmw_ret_t GraphCache::get_node_names(
std::move(free_enclaves_lambda));
}
- // TODO(Yadunund): Remove this printout.
- // const std::string & graph_str = YAML::Dump(graph_);
- // RCUTILS_LOG_WARN_NAMED("rmw_zenoh_cpp", "[graph]\n%s\n", graph_str.c_str());
// Fill node names, namespaces and enclaves.
std::size_t j = 0;
for (auto ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) {
@@ -612,3 +632,96 @@ rmw_ret_t GraphCache::get_node_names(
return RMW_RET_OK;
}
+
+///=============================================================================
+rmw_ret_t GraphCache::get_topic_names_and_types(
+ rcutils_allocator_t * allocator,
+ bool no_demangle,
+ rmw_names_and_types_t * topic_names_and_types)
+{
+ static_cast(no_demangle);
+ RCUTILS_CHECK_ALLOCATOR_WITH_MSG(
+ allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT);
+
+ std::lock_guard lock(graph_mutex_);
+ const size_t topic_number = graph_topics_.size();
+ // TODO(Yadunund): Delete.
+ RCUTILS_LOG_WARN_NAMED(
+ "rmw_zenoh_cpp",
+ "topic_number %ld", topic_number);
+
+ rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, topic_number, allocator);
+ if (ret != RMW_RET_OK) {
+ return ret;
+ }
+ auto cleanup_names_and_types = rcpputils::make_scope_exit(
+ [topic_names_and_types] {
+ rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
+ if (fail_ret != RMW_RET_OK) {
+ RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
+ }
+ });
+
+ // topic_names_and_types->names is an rcutils_string_array_t,
+ // while topic_names_and_types->types is an rcutils_string_array_t *
+
+ // rcutils_ret_t rcutils_ret =
+ // rcutils_string_array_init(topic_names_and_types->names, topic_number, allocator);
+ // if (rcutils_ret != RCUTILS_RET_OK) {
+ // return RMW_RET_BAD_ALLOC;
+ // }
+ auto free_topic_names = rcpputils::make_scope_exit(
+ [names = &topic_names_and_types->names]() {
+ rcutils_ret_t ret = rcutils_string_array_fini(names);
+ if (ret != RCUTILS_RET_OK) {
+ RCUTILS_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "failed to cleanup during error handling: %s", rcutils_get_error_string().str);
+ }
+ });
+
+ rcutils_ret_t rcutils_ret =
+ rcutils_string_array_init(topic_names_and_types->types, topic_number, allocator);
+ if (rcutils_ret != RCUTILS_RET_OK) {
+ return RMW_RET_BAD_ALLOC;
+ }
+ auto free_topic_types = rcpputils::make_scope_exit(
+ [types = topic_names_and_types->types]() {
+ rcutils_ret_t ret = rcutils_string_array_fini(types);
+ if (ret != RCUTILS_RET_OK) {
+ RCUTILS_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "failed to cleanup during error handling: %s", rcutils_get_error_string().str);
+ }
+ });
+
+ // Fill topic names and types.
+ std::size_t j = 0;
+ for (const auto & it : graph_topics_) {
+ // Split based on "?".
+ // TODO(Yadunund): Be more systematic about this.
+ std::vector parts = split_keyexpr(it.first, '?');
+ if (parts.size() < 2) {
+ RCUTILS_LOG_ERROR_NAMED(
+ "rmw_zenoh_cpp",
+ "Invalid topic_key %s", it.first.c_str());
+ return RMW_RET_INVALID_ARGUMENT;
+ }
+ topic_names_and_types->names.data[j] = rcutils_strdup(parts.at(0).c_str(), *allocator);
+ if (!topic_names_and_types->names.data[j]) {
+ return RMW_RET_BAD_ALLOC;
+ }
+ topic_names_and_types->types->data[j] = rcutils_strdup(
+ parts.at(1).c_str(), *allocator);
+ if (!topic_names_and_types->types->data[j]) {
+ return RMW_RET_BAD_ALLOC;
+ }
+ ++j;
+ }
+
+ cleanup_names_and_types.cancel();
+ free_topic_names.cancel();
+ free_topic_types.cancel();
+
+ return RMW_RET_OK;
+}
diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp
index 0e0c4c14..78336704 100644
--- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp
+++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp
@@ -21,14 +21,15 @@
#include
#include
#include
+#include
#include
#include "rcutils/allocator.h"
#include "rcutils/types.h"
#include "rmw/rmw.h"
+#include "rmw/names_and_types.h"
-#include "yaml-cpp/yaml.h"
///=============================================================================
class GenerateToken
@@ -152,6 +153,11 @@ class GraphCache final
rcutils_string_array_t * enclaves,
rcutils_allocator_t * allocator) const;
+ rmw_ret_t get_topic_names_and_types(
+ rcutils_allocator_t * allocator,
+ bool no_demangle,
+ rmw_names_and_types_t * topic_names_and_types);
+
private:
std::mutex publishers_mutex_;
uint64_t publishers_handle_id_{0};
@@ -183,7 +189,12 @@ class GraphCache final
node_n:
*/
// Map namespace to a map of .
- std::unordered_map> graph_;
+ std::unordered_map> graph_ = {};
+ // Optimize published topic lookups by caching an unordered_map
+ // where key is topic_name+topic_type and the value the count.
+ // TODO(Yadunund): Use a single map for both published and subscribed topics.
+ // Consider changing value to pair for pub,sub resp.
+ std::unordered_map graph_topics_ = {};
mutable std::mutex graph_mutex_;
};
diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
index 7e4f2645..a70b7cb4 100644
--- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
+++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include "detail/rmw_data_types.hpp"
+
#include "rcutils/strdup.h"
#include "rmw/error_handling.h"
@@ -31,63 +33,69 @@ rmw_get_topic_names_and_types(
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types)
{
- static_cast(node);
- static_cast(no_demangle);
+ RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
+ RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
+ RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
+ RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
+ RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT);
+
+ return node->context->impl->graph_cache.get_topic_names_and_types(
+ allocator, no_demangle, topic_names_and_types);
- rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
- if (ret != RMW_RET_OK) {
- return ret;
- }
- auto cleanup_names_and_types = rcpputils::make_scope_exit(
- [topic_names_and_types] {
- rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
- if (fail_ret != RMW_RET_OK) {
- RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
- }
- });
+ // rmw_ret_t ret = rmw_names_and_types_init(topic_names_and_types, 1, allocator);
+ // if (ret != RMW_RET_OK) {
+ // return ret;
+ // }
+ // auto cleanup_names_and_types = rcpputils::make_scope_exit(
+ // [topic_names_and_types] {
+ // rmw_ret_t fail_ret = rmw_names_and_types_fini(topic_names_and_types);
+ // if (fail_ret != RMW_RET_OK) {
+ // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup names and types during error handling");
+ // }
+ // });
- // topic_names_and_types->names is an rcutils_string_array_t,
- // while topic_names_and_types->types is an rcutils_string_array_t *
+ // // topic_names_and_types->names is an rcutils_string_array_t,
+ // // while topic_names_and_types->types is an rcutils_string_array_t *
- topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
- if (topic_names_and_types->names.data[0] == nullptr) {
- RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
- return RMW_RET_BAD_ALLOC;
- }
- auto free_names = rcpputils::make_scope_exit(
- [topic_names_and_types, allocator] {
- allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
- });
+ // topic_names_and_types->names.data[0] = rcutils_strdup("/chatter", *allocator);
+ // if (topic_names_and_types->names.data[0] == nullptr) {
+ // RMW_SET_ERROR_MSG("failed to allocate memory for topic names");
+ // return RMW_RET_BAD_ALLOC;
+ // }
+ // auto free_names = rcpputils::make_scope_exit(
+ // [topic_names_and_types, allocator] {
+ // allocator->deallocate(topic_names_and_types->names.data[0], allocator->state);
+ // });
- rcutils_ret_t rcutils_ret = rcutils_string_array_init(
- topic_names_and_types->types, 1,
- allocator);
- if (rcutils_ret != RCUTILS_RET_OK) {
- RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
- return RMW_RET_BAD_ALLOC;
- }
- auto fini_string_array = rcpputils::make_scope_exit(
- [topic_names_and_types] {
- rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
- if (fail_ret != RMW_RET_OK) {
- RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
- }
- });
+ // rcutils_ret_t rcutils_ret = rcutils_string_array_init(
+ // topic_names_and_types->types, 1,
+ // allocator);
+ // if (rcutils_ret != RCUTILS_RET_OK) {
+ // RMW_SET_ERROR_MSG("failed to allocate memory for topic types");
+ // return RMW_RET_BAD_ALLOC;
+ // }
+ // auto fini_string_array = rcpputils::make_scope_exit(
+ // [topic_names_and_types] {
+ // rmw_ret_t fail_ret = rcutils_string_array_fini(topic_names_and_types->types);
+ // if (fail_ret != RMW_RET_OK) {
+ // RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup topic types during error handling");
+ // }
+ // });
- topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
- if (topic_names_and_types->types[0].data[0] == nullptr) {
- RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
- return RMW_RET_BAD_ALLOC;
- }
- auto free_types = rcpputils::make_scope_exit(
- [topic_names_and_types, allocator] {
- allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
- });
+ // topic_names_and_types->types[0].data[0] = rcutils_strdup("std_msgs/msg/String", *allocator);
+ // if (topic_names_and_types->types[0].data[0] == nullptr) {
+ // RMW_SET_ERROR_MSG("failed to allocate memory for topic data");
+ // return RMW_RET_BAD_ALLOC;
+ // }
+ // auto free_types = rcpputils::make_scope_exit(
+ // [topic_names_and_types, allocator] {
+ // allocator->deallocate(topic_names_and_types->types[0].data[0], allocator->state);
+ // });
- free_types.cancel();
- fini_string_array.cancel();
- free_names.cancel();
- cleanup_names_and_types.cancel();
+ // free_types.cancel();
+ // fini_string_array.cancel();
+ // free_names.cancel();
+ // cleanup_names_and_types.cancel();
return RMW_RET_OK;
}