From 1a3b4598f634ea3fe792070f329e08cab6106c42 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 17:24:33 +0800 Subject: [PATCH 01/31] Create querying subscriber if qos is transient local Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 2 + rmw_zenoh_cpp/package.xml | 1 + rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 4 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 103 +++++++++++++------- 4 files changed, 75 insertions(+), 35 deletions(-) diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 24b70723..85230a8f 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(rmw_dds_common REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -57,6 +58,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + rmw_dds_common::rmw_dds_common_library zenohc::lib ) diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index 02abec46..26b2a946 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -19,6 +19,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + rmw_dds_common ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index de45faba..3df33b9d 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "rcutils/allocator.h" @@ -110,7 +111,8 @@ struct saved_msg_data ///============================================================================== struct rmw_subscription_data_t { - z_owned_subscriber_t sub; + // An owned subscriber or querying_subscriber depending on the QoS settings. + std::variant sub; // Liveliness token for the subscription. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5653fae6..b37f3445 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -49,6 +49,8 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +#include "rmw_dds_common/qos.hpp" + namespace { @@ -1137,20 +1139,15 @@ rmw_create_subscription( } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( - // node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); - // Check RMW QoS - // if (!is_valid_qos(*qos_profile)) { - // RMW_SET_ERROR_MSG("create_subscription() called with invalid QoS"); - // return nullptr; - // } - const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_message_type_support @@ -1216,15 +1213,6 @@ rmw_create_subscription( rmw_subscription_data_t); }); - // Set the reliability of the subscription options based on qos_profile. - // The default options will be initialized with Best Effort reliability. - auto sub_options = z_subscriber_options_default(); - sub_data->reliable = false; - if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; - } - sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -1291,19 +1279,54 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(sub_data->sub)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return nullptr; + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. + sub_data->reliable = false; + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + sub_data->reliable = true; + sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; + } + sub_data->sub = ze_declare_querying_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } } + // Create a regular subscriber for all other durability settings. + else { + z_subscriber_options_t sub_options = z_subscriber_options_default(); + if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + sub_data->reliable = true; + } + sub_data->sub = z_declare_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } + } + auto undeclare_z_sub = rcpputils::make_scope_exit( [sub_data]() { - z_undeclare_subscriber(z_move(sub_data->sub)); + // TODO(Yadunund): Check if this is okay or if it is better + // to cast into explicit types and call appropriate undeclare method. + // See rmw_destroy_subscription() + z_drop(z_move(sub_data->sub)); }); // Publish to the graph that a new subscription is in town @@ -1380,9 +1403,19 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); - if (z_undeclare_subscriber(z_move(sub_data->sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - ret = RMW_RET_ERROR; + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); + if (sub != nullptr) { + if (z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } } RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); @@ -1429,6 +1462,8 @@ rmw_subscription_get_actual_qos( qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + qos->durability = std::holds_alternative(sub_data->sub) ? + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; return RMW_RET_OK; } From 3f5a607e8ece6a247be824638e9abcd9e4e25711 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 19:03:47 +0800 Subject: [PATCH 02/31] Update graph cache and introspection methods with qos Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 21 ++- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 135 +++++++++++++----- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 20 +-- 4 files changed, 125 insertions(+), 57 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b34e68d4..8e390d62 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -175,11 +175,10 @@ void GraphCache::parse_put(const std::string & keyexpr) RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", - "Added %s on topic %s with type %s and qos %s to node /%s.", + "Added %s on topic %s with type %s to node /%s.", entity_desc.c_str(), topic_info.name_.c_str(), topic_info.type_.c_str(), - topic_info.qos_.c_str(), graph_node.name_.c_str()); }; @@ -412,11 +411,10 @@ void GraphCache::parse_del(const std::string & keyexpr) RCUTILS_LOG_INFO_NAMED( "rmw_zenoh_cpp", - "Removed %s on topic %s with type %s and qos %s to node /%s.", + "Removed %s on topic %s with type %s to node /%s.", entity_desc.c_str(), topic_info.name_.c_str(), topic_info.type_.c_str(), - topic_info.qos_.c_str(), graph_node.name_.c_str()); }; @@ -927,7 +925,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( entity_type == EntityType::Publisher ? nodes[i]->pubs_ : nodes[i]->subs_; const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second; - for (const auto & [topic_data, _] : topic_data_map) { + for (const auto & [topic_type, topic_data] : topic_data_map) { rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); @@ -949,7 +947,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ret = rmw_topic_endpoint_info_set_topic_type( &endpoint_info, - _demangle_if_ros_type(topic_data).c_str(), + _demangle_if_ros_type(topic_type).c_str(), allocator); if (RMW_RET_OK != ret) { return ret; @@ -961,7 +959,16 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( if (RMW_RET_OK != ret) { return ret; } - // TODO(Yadunund): Set type_hash, qos_profile, gid. + + ret = rmw_topic_endpoint_info_set_qos_profile( + &endpoint_info, + &topic_data->info_.qos_ + ); + if (RMW_RET_OK != ret) { + return ret; + } + + // TODO(Yadunund): Set type_hash, gid. } } diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 7118b64b..66aab243 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -47,7 +47,7 @@ NodeInfo::NodeInfo( TopicInfo::TopicInfo( std::string name, std::string type, - std::string qos) + rmw_qos_profile_t qos) : name_(std::move(name)), type_(std::move(type)), qos_(std::move(qos)) @@ -67,6 +67,7 @@ static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; +static const char QOS_DELIMITER = ':'; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -84,6 +85,12 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; +// Map string literals to qos enums. +// static const std::unordered_map str_to_qos_e = { +// {"reliable", RMW_QOS_POLICY_RELIABILITY_RELIABLE}, +// {"best_effort", RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT} +// }; + std::string zid_to_str(z_id_t id) { std::stringstream ss; @@ -96,6 +103,84 @@ std::string zid_to_str(z_id_t id) return ss.str(); } +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector delim_idx = {}; + // Insert -1 for starting position to make the split easier when using substr. + delim_idx.push_back(-1); + std::size_t idx = 0; + for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { + if (*it == delim) { + delim_idx.push_back(idx); + } + ++idx; + } + std::vector result = {}; + try { + for (std::size_t i = 1; i < delim_idx.size(); ++i) { + const size_t prev_idx = delim_idx[i - 1]; + const size_t idx = delim_idx[i]; + result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); + } + } catch (const std::exception & e) { + printf("%s\n", e.what()); + return {}; + } + // Finally add the last substr. + result.push_back(keyexpr.substr(delim_idx.back() + 1)); + return result; +} + +/** + * Convert a rmw_qos_profile_t to a string with format: + * + * ::," + * Where: + * - "reliable" or "best_effort". + * - "volatile" or "transient_local". + * - "keep_last". + * - The depth number. + */ +// TODO(Yadunund): Rely on maps to retrieve strings. +std::string qos_to_keyexpr(rmw_qos_profile_t qos) +{ + std::string keyexpr = ""; + keyexpr += qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE ? "reliable" : "best_effort"; + keyexpr += QOS_DELIMITER; + keyexpr += qos.durability == + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; + keyexpr += QOS_DELIMITER; + // TODO(Yadunund): Update once we properly support History. + keyexpr += "keep_last,"; + keyexpr += std::to_string(qos.depth); + return keyexpr; +} + +/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. +std::optional keyexpr_to_qos(const std::string & keyexpr) +{ + rmw_qos_profile_t qos; + const std::vector parts = split_keyexpr(keyexpr, QOS_DELIMITER); + if (parts.size() < 3) { + return std::nullopt; + } + qos.reliability = parts[0] == + "reliable" ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + qos.durability = parts[1] == + "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : + RMW_QOS_POLICY_DURABILITY_VOLATILE; + const std::vector history_parts = split_keyexpr(parts[2], ','); + if (history_parts.size() < 2) { + return std::nullopt; + } + qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + sscanf(history_parts[1].c_str(), "%zu", &qos.depth); + + return qos; +} + } // namespace ///============================================================================= @@ -156,7 +241,8 @@ Entity::Entity( const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". token_ss << - "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; + "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr( + topic_info.qos_); } this->keyexpr_ = token_ss.str(); @@ -186,41 +272,6 @@ std::optional Entity::make( return entity; } -namespace -{ -///============================================================================= -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const size_t prev_idx = delim_idx[i - 1]; - const size_t idx = delim_idx[i]; - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} - -} // namespace - ///============================================================================= std::optional Entity::make(const std::string & keyexpr) { @@ -276,10 +327,18 @@ std::optional Entity::make(const std::string & keyexpr) "Received liveliness token for non-node entity without required parameters."); return std::nullopt; } + std::optional qos = keyexpr_to_qos(parts[8]); + if (!qos.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid qos keyexpr"); + return std::nullopt; + } topic_info = TopicInfo{ demangle_name(std::move(parts[6])), std::move(parts[7]), - std::move(parts[8])}; + std::move(qos.value()) + }; } return Entity{ diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 66f2f11b..61635af3 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -21,6 +21,8 @@ #include #include +#include "rmw/types.h" + namespace liveliness { @@ -45,12 +47,12 @@ struct TopicInfo { std::string name_; std::string type_; - std::string qos_; + rmw_qos_profile_t qos_; TopicInfo( std::string name, std::string type, - std::string qos); + rmw_qos_profile_t qos); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index b37f3445..c1c03041 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -456,12 +456,12 @@ rmw_create_publisher( } } // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( - // node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -621,7 +621,7 @@ rmw_create_publisher( liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, - publisher_data->type_support->get_name(), "reliable"} + publisher_data->type_support->get_name(), adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1335,7 +1335,7 @@ rmw_create_subscription( liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, - sub_data->type_support->get_name(), "reliable"} + sub_data->type_support->get_name(), adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1905,7 +1905,7 @@ rmw_create_client( liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - std::move(service_type), "reliable"} + std::move(service_type), *qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2493,7 +2493,7 @@ rmw_create_service( liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - std::move(service_type), "reliable"} + std::move(service_type), *qos_profiles} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( From eed1d127cd301eac0d68cdcb8cb01f447f6ee489 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 11 Jan 2024 23:58:53 +0800 Subject: [PATCH 03/31] Initialize pubcache only if durability is transient local Signed-off-by: Yadunund --- .../DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 2 +- .../DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 | 2 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 3 +++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 25 +++++++++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 434de9fe..31803a90 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -69,7 +69,7 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 43550e1d..ea13cc64 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -69,7 +69,7 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 3df33b9d..e9e6f471 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -74,6 +74,9 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Optional publication cache when durability is transient_local. + ze_owned_publication_cache_t pub_cache; + // Liveliness token for the publisher. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c1c03041..9a3854d6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -584,6 +584,29 @@ rmw_create_publisher( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + + // Create a Publication Cache if durability is transient_local. + publisher_data->pub_cache = ze_publication_cache_null(); + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = adapted_qos_profile.depth; + publisher_data->pub_cache = ze_declare_publication_cache( + z_loan(context_impl->session), + z_loan(keyexpr), + &pub_cache_opts + ); + if (!z_check(publisher_data->pub_cache)) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } + } + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data) { + z_drop(z_move(publisher_data->pub_cache)); + } + }); + // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), @@ -648,6 +671,7 @@ rmw_create_publisher( } free_token.cancel(); + undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -702,6 +726,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // return RMW_RET_ERROR; // } z_drop(z_move(publisher_data->token)); + z_drop(z_move(publisher_data->pub_cache)); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); From 32c788b8d3cecc3f04630ba165627ecd4f18ff90 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 12 Jan 2024 00:17:30 +0800 Subject: [PATCH 04/31] Set publisher congestion when appropriate Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 12 +++++++----- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 11 ++++++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 66aab243..00cb272c 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -68,6 +68,7 @@ static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; static const char QOS_DELIMITER = ':'; +static const char QOS_HISTORY_DELIMITER = ','; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -140,7 +141,7 @@ std::vector split_keyexpr( * Where: * - "reliable" or "best_effort". * - "volatile" or "transient_local". - * - "keep_last". + * - "keep_all" or "keep_last". * - The depth number. */ // TODO(Yadunund): Rely on maps to retrieve strings. @@ -152,8 +153,8 @@ std::string qos_to_keyexpr(rmw_qos_profile_t qos) keyexpr += qos.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; keyexpr += QOS_DELIMITER; - // TODO(Yadunund): Update once we properly support History. - keyexpr += "keep_last,"; + keyexpr += qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last"; + keyexpr += QOS_HISTORY_DELIMITER; keyexpr += std::to_string(qos.depth); return keyexpr; } @@ -171,11 +172,12 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.durability = parts[1] == "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; - const std::vector history_parts = split_keyexpr(parts[2], ','); + const std::vector history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER); if (history_parts.size() < 2) { return std::nullopt; } - qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + qos.history = history_parts[0] == + "keep_all" ? RMW_QOS_POLICY_HISTORY_KEEP_ALL : RMW_QOS_POLICY_HISTORY_KEEP_LAST; sscanf(history_parts[1].c_str(), "%zu", &qos.depth); return qos; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 9a3854d6..e70321a9 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -607,11 +607,20 @@ rmw_create_publisher( } }); + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts = z_publisher_options_default(); + if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } else { + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), z_loan(keyexpr), - NULL + &opts ); if (!z_check(publisher_data->pub)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher"); From eb91c0eca03a64f475ec4cf7a01a0290a992b940 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 12 Jan 2024 00:41:03 +0800 Subject: [PATCH 05/31] Make liveliness tokens more compact Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 34 ++++++------------- rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 1 - 2 files changed, 11 insertions(+), 24 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 00cb272c..726bacb6 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -86,21 +86,14 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; -// Map string literals to qos enums. -// static const std::unordered_map str_to_qos_e = { -// {"reliable", RMW_QOS_POLICY_RELIABILITY_RELIABLE}, -// {"best_effort", RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT} -// }; - std::string zid_to_str(z_id_t id) { std::stringstream ss; ss << std::hex; size_t i = 0; - for (; i < (sizeof(id.id) - 1); i++) { - ss << static_cast(id.id[i]) << "."; + for (; i < (sizeof(id.id)); i++) { + ss << static_cast(id.id[i]); } - ss << static_cast(id.id[i]); return ss.str(); } @@ -139,21 +132,20 @@ std::vector split_keyexpr( * * ::," * Where: - * - "reliable" or "best_effort". - * - "volatile" or "transient_local". - * - "keep_all" or "keep_last". + * - enum value from rmw_qos_reliability_policy_e. + * - enum value from rmw_qos_durability_policy_e. + * - enum value from rmw_qos_history_policy_e. * - The depth number. */ // TODO(Yadunund): Rely on maps to retrieve strings. std::string qos_to_keyexpr(rmw_qos_profile_t qos) { std::string keyexpr = ""; - keyexpr += qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE ? "reliable" : "best_effort"; + keyexpr += std::to_string(qos.reliability); keyexpr += QOS_DELIMITER; - keyexpr += qos.durability == - RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile"; + keyexpr += std::to_string(qos.durability); keyexpr += QOS_DELIMITER; - keyexpr += qos.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last"; + keyexpr += std::to_string(qos.history); keyexpr += QOS_HISTORY_DELIMITER; keyexpr += std::to_string(qos.depth); return keyexpr; @@ -167,17 +159,13 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) if (parts.size() < 3) { return std::nullopt; } - qos.reliability = parts[0] == - "reliable" ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - qos.durability = parts[1] == - "transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : - RMW_QOS_POLICY_DURABILITY_VOLATILE; const std::vector history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER); if (history_parts.size() < 2) { return std::nullopt; } - qos.history = history_parts[0] == - "keep_all" ? RMW_QOS_POLICY_HISTORY_KEEP_ALL : RMW_QOS_POLICY_HISTORY_KEEP_LAST; + sscanf(parts[0].c_str(), "%zu", &qos.reliability); + sscanf(parts[1].c_str(), "%zu", &qos.durability); + sscanf(history_parts[0].c_str(), "%zu", &qos.history); sscanf(history_parts[1].c_str(), "%zu", &qos.depth); return qos; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 61635af3..031edc3a 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -23,7 +23,6 @@ #include "rmw/types.h" - namespace liveliness { From 21a0d8c7af585d4c237a6b21e6f79a52c06e6596 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 12 Jan 2024 01:27:06 +0800 Subject: [PATCH 06/31] Fix rmw_publisher_count_matched_subscriptions Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 21 +++++++++++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 4 ++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 24 +++++++++++++++++------- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 8e390d62..1cb4288a 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -34,6 +34,7 @@ #include "rmw/validate_node_name.h" #include "graph_cache.hpp" +#include "rmw_data_types.hpp" ///============================================================================= using Entity = liveliness::Entity; @@ -711,6 +712,26 @@ rmw_ret_t GraphCache::get_topic_names_and_types( return fill_names_and_types(graph_topics_, allocator, topic_names_and_types); } +///============================================================================= +rmw_ret_t GraphCache::publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + // TODO(Yadunund): Check if QoS settings also match. + *subscription_count = 0; + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name); + if (topic_it != graph_topics_.end()) { + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + pub_data->type_support->get_name()); + if (topic_data_it != topic_it->second.end()) { + *subscription_count = topic_data_it->second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} + ///============================================================================= rmw_ret_t GraphCache::get_service_names_and_types( rcutils_allocator_t * allocator, diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index adeb3e01..7c19700d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -106,6 +106,10 @@ class GraphCache final bool no_demangle, rmw_names_and_types_t * topic_names_and_types) const; + rmw_ret_t publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count); + rmw_ret_t get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) const; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index e70321a9..6d66202f 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -572,8 +572,6 @@ rmw_create_publisher( allocator->deallocate(const_cast(rmw_publisher->topic_name), allocator->state); }); - // TODO(yadunund): Parse adapted_qos_profile and publisher_options to generate - // a z_publisher_put_options struct instead of passing NULL to this function. z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( topic_name, node->context->actual_domain_id, allocator); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( @@ -954,11 +952,23 @@ rmw_publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count) { - static_cast(publisher); - static_cast(subscription_count); - // TODO(yadunund): Fixme. - *subscription_count = 0; - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription_count, RMW_RET_INVALID_ARGUMENT); + + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + + return context_impl->graph_cache.publisher_count_matched_subscriptions( + publisher, subscription_count); } //============================================================================== From 4e86b102150c195e9e6a8a82741ac0ffe4df41a3 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 15 Jan 2024 10:23:07 +0800 Subject: [PATCH 07/31] Explictly map qos settings Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 726bacb6..f44a6e74 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -86,6 +86,31 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; +static const std::unordered_map str_to_qos_history = { + {std::to_string(RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT), RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_LAST), RMW_QOS_POLICY_HISTORY_KEEP_LAST}, + {std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_ALL), RMW_QOS_POLICY_HISTORY_KEEP_ALL}, + {std::to_string(RMW_QOS_POLICY_HISTORY_UNKNOWN), RMW_QOS_POLICY_HISTORY_UNKNOWN} +}; + +static const std::unordered_map str_to_qos_reliability = { + {std::to_string(RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT), + RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_RELIABLE), RMW_QOS_POLICY_RELIABILITY_RELIABLE}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT), RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_UNKNOWN), RMW_QOS_POLICY_RELIABILITY_UNKNOWN} +}; + +static const std::unordered_map str_to_qos_durability = { + {std::to_string(RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT), + RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL), + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_VOLATILE), RMW_QOS_POLICY_DURABILITY_VOLATILE}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_UNKNOWN), RMW_QOS_POLICY_DURABILITY_UNKNOWN} +}; + std::string zid_to_str(z_id_t id) { std::stringstream ss; @@ -163,11 +188,23 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) if (history_parts.size() < 2) { return std::nullopt; } - sscanf(parts[0].c_str(), "%zu", &qos.reliability); - sscanf(parts[1].c_str(), "%zu", &qos.durability); - sscanf(history_parts[0].c_str(), "%zu", &qos.history); + + try { + qos.history = str_to_qos_history.at(history_parts[0]); + qos.reliability = str_to_qos_reliability.at(parts[0]); + qos.durability = str_to_qos_durability.at(parts[1]); + } catch (const std::exception & e) { + return std::nullopt; + } sscanf(history_parts[1].c_str(), "%zu", &qos.depth); + // Liveliness is always automatic given liveliness tokens. + qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + qos.liveliness_lease_duration = RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT; + + // TODO(Yadunund): Update once we support these settings. + qos.deadline = RMW_QOS_DEADLINE_DEFAULT; + qos.lifespan = RMW_QOS_LIFESPAN_DEFAULT; return qos; } From 9d49ef3476e4d4c4e7832efa406adb9f16f94cae Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 15 Jan 2024 11:00:51 +0800 Subject: [PATCH 08/31] Reuse qos_profile_check_compatibility() from rmw_dds_common Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_qos.cpp | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 8ae5b4f2..2cb42577 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -16,6 +16,8 @@ #include "rmw/types.h" #include "rmw/qos_profiles.h" +#include "rmw_dds_common/qos.hpp" + extern "C" { rmw_ret_t @@ -26,28 +28,14 @@ rmw_qos_profile_check_compatible( char * reason, size_t reason_size) { - if (!compatibility) { - RMW_SET_ERROR_MSG("compatibility parameter is null"); - return RMW_RET_INVALID_ARGUMENT; - } - - if (!reason && reason_size != 0u) { - RMW_SET_ERROR_MSG("reason parameter is null, but reason_size parameter is not zero"); - return RMW_RET_INVALID_ARGUMENT; - } - - // Presume profiles are compatible until proven otherwise - *compatibility = RMW_QOS_COMPATIBILITY_OK; - - // Initialize reason buffer - if (reason && reason_size != 0u) { - reason[0] = '\0'; - } - - // TODO(clalancette): check compatibility in Zenoh QOS profiles. - (void)publisher_profile; - (void)subscription_profile; - return RMW_RET_OK; + // In Zenoh, publishers do not have any reliability settings. + // A publisher and subscription are only incompatible if the durability of the publisher is + // TRANSIENT_LOCAL but that of the subscription is not. In such a scenario, a late-joining + // subscription can fail to receive messages so we flag it accordingly. + // However, we can reuse the qos_profile_check_compatible() method from rmw_dds_common + // since it largely applies in rmw_zenoh. + return rmw_dds_common::qos_profile_check_compatible( + publisher_profile, subscription_profile, compatibility, reason, reason_size); } } // extern "C" From b7a27bed361e432cef0157435019c4980e8ef8da Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 16 Jan 2024 16:35:58 +0800 Subject: [PATCH 09/31] Make transient_local work Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 40 ++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6d66202f..65100e57 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -455,6 +455,12 @@ rmw_create_publisher( return nullptr; } } + + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[rmw_create_publisher] %s", + topic_name); + // Adapt any 'best available' QoS options rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( @@ -584,7 +590,6 @@ rmw_create_publisher( } // Create a Publication Cache if durability is transient_local. - publisher_data->pub_cache = ze_publication_cache_null(); if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); pub_cache_opts.history = adapted_qos_profile.depth; @@ -733,6 +738,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // return RMW_RET_ERROR; // } z_drop(z_move(publisher_data->token)); + publisher_data->pub_cache = ze_publication_cache_null(); z_drop(z_move(publisher_data->pub_cache)); RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); @@ -1192,6 +1198,11 @@ rmw_create_subscription( } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); + RCUTILS_LOG_INFO_NAMED( + "rmw_zenoh_cpp", + "[rmw_create_subscription] %s", + topic_name); + const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_message_type_support @@ -1327,13 +1338,18 @@ rmw_create_subscription( // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. + z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); + auto always_drop_keystr = rcpputils::make_scope_exit( + [&owned_key_str]() { + z_drop(z_move(owned_key_str)); + }); + sub_data->reliable = false; if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; sub_data->reliable = true; - sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; } sub_data->sub = ze_declare_querying_subscriber( z_loan(context_impl->session), @@ -1345,6 +1361,7 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } + printf("Created querying sub %s\n", owned_key_str._cstr); } // Create a regular subscriber for all other durability settings. else { @@ -1363,14 +1380,21 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } + printf("Created regular sub %s\n", owned_key_str._cstr); } auto undeclare_z_sub = rcpputils::make_scope_exit( [sub_data]() { - // TODO(Yadunund): Check if this is okay or if it is better - // to cast into explicit types and call appropriate undeclare method. - // See rmw_destroy_subscription() - z_drop(z_move(sub_data->sub)); + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); + if (sub == nullptr || z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } + } }); // Publish to the graph that a new subscription is in town @@ -3028,7 +3052,7 @@ rmw_wait( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); // TODO(yadunund): Switch to debug log level. - RCUTILS_LOG_WARN_NAMED( + RCUTILS_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "[rmw_wait] %ld subscriptions, %ld services, %ld clients, %ld events, %ld guard conditions", subscriptions->subscriber_count, @@ -3039,7 +3063,7 @@ rmw_wait( // TODO(yadunund): Switch to debug log level. if (wait_timeout) { - RCUTILS_LOG_WARN_NAMED( + RCUTILS_LOG_DEBUG_NAMED( "rmw_zenoh_common_cpp", "[rmw_wait] TIMEOUT: %ld s %ld ns", wait_timeout->sec, wait_timeout->nsec); From 48bb3e5ede21aced9b6e01601c8826deb34ad0b3 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 18 Jan 2024 12:53:37 +0800 Subject: [PATCH 10/31] Close zenoh session in rmw_shutdown Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_init.cpp | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 1b612d88..3c0f88d0 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -351,6 +351,15 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + context->impl->shm_manager = zc_shm_manager_null(); + z_drop(z_move(context->impl->shm_manager)); + z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + // Close the zenoh session + if (z_close(z_move(context->impl->session)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + context->impl->is_shutdown = true; return RMW_RET_OK; } @@ -375,21 +384,8 @@ rmw_context_fini(rmw_context_t * context) return RMW_RET_INVALID_ARGUMENT; } - // We destroy zenoh artifacts here instead of rmw_shutdown() as - // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session - // alive for the latter. - // TODO(Yadunund): Check if this is a bug in rmw. - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - const rcutils_allocator_t * allocator = &context->options.allocator; - z_drop(z_move(context->impl->shm_manager)); - RMW_TRY_DESTRUCTOR( static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), GuardCondition, ); From 2099b8bc1b75569cd82bc1127e0454040ac056b2 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 18 Jan 2024 13:48:20 +0800 Subject: [PATCH 11/31] store adapted_qos_profiles in pub/sub/client/service Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 4 +- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 17 ++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 136 +++++++++++++------- 3 files changed, 102 insertions(+), 55 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index d26c9e0d..72872e6a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -54,13 +54,13 @@ void sub_data_handler( { std::lock_guard lock(sub_data->message_queue_mutex); - if (sub_data->message_queue.size() >= sub_data->queue_depth) { + if (sub_data->message_queue.size() >= sub_data->adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Message queue depth of %ld reached, discarding oldest message " "for subscription for %s", - sub_data->queue_depth, + sub_data->adapted_qos_profile.depth, z_loan(keystr)); std::unique_ptr old = std::move(sub_data->message_queue.back()); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e9e6f471..dc4058e9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -77,6 +77,9 @@ struct rmw_publisher_data_t // Optional publication cache when durability is transient_local. ze_owned_publication_cache_t pub_cache; + // Store the actual QoS profile used to configure this publisher. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the publisher. zc_owned_liveliness_token_t token; @@ -117,6 +120,9 @@ struct rmw_subscription_data_t // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub; + // Store the actual QoS profile used to configure this subscription. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the subscription. zc_owned_liveliness_token_t token; @@ -128,9 +134,6 @@ struct rmw_subscription_data_t std::deque> message_queue; std::mutex message_queue_mutex; - size_t queue_depth; - bool reliable; - std::mutex internal_mutex; std::condition_variable * condition{nullptr}; }; @@ -151,6 +154,10 @@ struct rmw_service_data_t z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; + // Store the actual QoS profile used to configure this service. + // The QoS is reused for getting requests and sending responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the service. zc_owned_liveliness_token_t token; @@ -181,6 +188,10 @@ struct rmw_client_data_t z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; + // Store the actual QoS profile used to configure this client. + // The QoS is reused for sending requests and getting responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the client. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 65100e57..56edc921 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -461,13 +461,6 @@ rmw_create_publisher( "[rmw_create_publisher] %s", topic_name); - // Adapt any 'best available' QoS options - rmw_qos_profile_t adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( - node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - if (RMW_RET_OK != ret) { - return nullptr; - } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -529,6 +522,21 @@ rmw_create_publisher( allocator->deallocate(publisher_data, allocator->state); }); + RMW_TRY_PLACEMENT_NEW(publisher_data, publisher_data, return nullptr, rmw_publisher_data_t); + auto destruct_pub_data = rcpputils::make_scope_exit( + [publisher_data]() { + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + publisher_data->~rmw_publisher_data_t(), + rmw_publisher_data_t); + }); + + // Adapt any 'best available' QoS options + publisher_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -590,9 +598,9 @@ rmw_create_publisher( } // Create a Publication Cache if durability is transient_local. - if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); - pub_cache_opts.history = adapted_qos_profile.depth; + pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; publisher_data->pub_cache = ze_declare_publication_cache( z_loan(context_impl->session), z_loan(keyexpr), @@ -612,8 +620,8 @@ rmw_create_publisher( // Set congestion_control to BLOCK if appropriate. z_publisher_options_t opts = z_publisher_options_default(); - if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && - adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } else { @@ -656,7 +664,7 @@ rmw_create_publisher( liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, - publisher_data->type_support->get_name(), adapted_qos_profile} + publisher_data->type_support->get_name(), publisher_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -686,6 +694,7 @@ rmw_create_publisher( undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); + destruct_pub_data.cancel(); destruct_msg_type_support.cancel(); free_type_support.cancel(); free_publisher_data.cancel(); @@ -747,6 +756,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) RMW_SET_ERROR_MSG("failed to undeclare pub"); ret = RMW_RET_ERROR; } + RMW_TRY_DESTRUCTOR(publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t, ); allocator->deallocate(publisher_data, allocator->state); } allocator->deallocate(const_cast(publisher->topic_name), allocator->state); @@ -984,9 +994,18 @@ rmw_publisher_get_actual_qos( const rmw_publisher_t * publisher, rmw_qos_profile_t * qos) { - static_cast(publisher); - static_cast(qos); - // TODO(yadunund): Fixme. + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + + *qos = pub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1188,14 +1207,6 @@ rmw_create_subscription( return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - // Adapt any 'best available' QoS options - rmw_qos_profile_t adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( - node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); - return nullptr; - } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); RCUTILS_LOG_INFO_NAMED( @@ -1268,6 +1279,15 @@ rmw_create_subscription( rmw_subscription_data_t); }); + // Adapt any 'best available' QoS options + sub_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; + } + sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -1294,7 +1314,6 @@ rmw_create_subscription( MessageTypeSupport); }); - sub_data->queue_depth = qos_profile->depth; sub_data->context = node->context; rmw_subscription->implementation_identifier = rmw_zenoh_identifier; @@ -1344,12 +1363,10 @@ rmw_create_subscription( z_drop(z_move(owned_key_str)); }); - sub_data->reliable = false; - if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); - if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; } sub_data->sub = ze_declare_querying_subscriber( z_loan(context_impl->session), @@ -1368,7 +1385,6 @@ rmw_create_subscription( z_subscriber_options_t sub_options = z_subscriber_options_default(); if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; } sub_data->sub = z_declare_subscriber( z_loan(context_impl->session), @@ -1403,7 +1419,7 @@ rmw_create_subscription( liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, - sub_data->type_support->get_name(), adapted_qos_profile} + sub_data->type_support->get_name(), sub_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1528,10 +1544,7 @@ rmw_subscription_get_actual_qos( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : - RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - qos->durability = std::holds_alternative(sub_data->sub) ? - RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL : RMW_QOS_POLICY_DURABILITY_VOLATILE; + *qos = sub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1863,6 +1876,10 @@ rmw_create_client( rmw_client_data_t); }); + // Adapt any 'best available' QoS options + client_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profile); + // Obtain the type support const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -1973,7 +1990,7 @@ rmw_create_client( liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - std::move(service_type), *qos_profile} + std::move(service_type), client_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2320,9 +2337,18 @@ rmw_client_request_publisher_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); + + *qos = client_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2333,10 +2359,8 @@ rmw_client_response_subscription_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for sending requests and receiving responses. + return rmw_client_request_publisher_get_actual_qos(client, qos); } //============================================================================== @@ -2435,6 +2459,11 @@ rmw_create_service( rmw_service_data_t); }); + // Adapt any 'best available' QoS options + service_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); + + // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -2561,7 +2590,7 @@ rmw_create_service( liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - std::move(service_type), *qos_profiles} + std::move(service_type), service_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2852,9 +2881,18 @@ rmw_service_request_subscription_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); + + *qos = service_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2865,10 +2903,8 @@ rmw_service_response_publisher_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for receiving requests and sending responses. + return rmw_service_request_subscription_get_actual_qos(service, qos); } //============================================================================== From 7d7df288c86ed70d0b8657361b0ea3195d4ec950 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 18 Jan 2024 13:54:58 +0800 Subject: [PATCH 12/31] Implement rmw_subscription_count_matched_publishers Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/graph_cache.cpp | 20 ++++++++++++++++++++ rmw_zenoh_cpp/src/detail/graph_cache.hpp | 4 ++++ rmw_zenoh_cpp/src/rmw_zenoh.cpp | 19 +++++++++++++++---- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 1cb4288a..51c75a6e 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -732,6 +732,26 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions( return RMW_RET_OK; } +///============================================================================= +rmw_ret_t GraphCache::subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + // TODO(Yadunund): Check if QoS settings also match. + *publisher_count = 0; + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name); + if (topic_it != graph_topics_.end()) { + rmw_subscription_data_t * sub_data = static_cast(subscription->data); + GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + sub_data->type_support->get_name()); + if (topic_data_it != topic_it->second.end()) { + *publisher_count = topic_data_it->second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + ///============================================================================= rmw_ret_t GraphCache::get_service_names_and_types( rcutils_allocator_t * allocator, diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 7c19700d..b9c5a2ac 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -110,6 +110,10 @@ class GraphCache final const rmw_publisher_t * publisher, size_t * subscription_count); + rmw_ret_t subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count); + rmw_ret_t get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) const; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 56edc921..0ebaf07f 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1518,12 +1518,23 @@ rmw_subscription_count_matched_publishers( const rmw_subscription_t * subscription, size_t * publisher_count) { - static_cast(subscription); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT); - // TODO(clalancette): implement - *publisher_count = 0; + rmw_subscription_data_t * sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_UNSUPPORTED; + return context_impl->graph_cache.subscription_count_matched_publishers( + subscription, publisher_count); } //============================================================================== From cba604fa318dbf5ebe4058098709e411e9ae65a9 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 19 Jan 2024 00:36:14 +0800 Subject: [PATCH 13/31] Use optional for pubcache and set consolidation for querying sub Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index dc4058e9..462ef9f6 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -75,7 +75,7 @@ struct rmw_publisher_data_t z_owned_publisher_t pub; // Optional publication cache when durability is transient_local. - ze_owned_publication_cache_t pub_cache; + std::optional pub_cache; // Store the actual QoS profile used to configure this publisher. rmw_qos_profile_t adapted_qos_profile; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0ebaf07f..f85d0aac 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -606,15 +606,15 @@ rmw_create_publisher( z_loan(keyexpr), &pub_cache_opts ); - if (!z_check(publisher_data->pub_cache)) { + if (!publisher_data->pub_cache.has_value() || !z_check(publisher_data->pub_cache.value())) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } } auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( [publisher_data]() { - if (publisher_data) { - z_drop(z_move(publisher_data->pub_cache)); + if (publisher_data && publisher_data->pub_cache.has_value()) { + z_drop(z_move(publisher_data->pub_cache.value())); } }); @@ -747,9 +747,9 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // return RMW_RET_ERROR; // } z_drop(z_move(publisher_data->token)); - publisher_data->pub_cache = ze_publication_cache_null(); - z_drop(z_move(publisher_data->pub_cache)); - + if (publisher_data->pub_cache.has_value()) { + z_drop(z_move(publisher_data->pub_cache.value())); + } RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); if (z_undeclare_publisher(z_move(publisher_data->pub))) { @@ -1365,6 +1365,15 @@ rmw_create_subscription( if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + // TODO(Yadunund): We need the query_target to be ALL_COMPLETE but the PubCache created + // does not setup a queryable that is complete. Once zettascale exposes that API, + // uncomment below. + // sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; + sub_options.query_target = Z_QUERY_TARGET_ALL; + // We set consolidation to none as we need to receive transient local messages + // from a number of publishers. Eg: To receive TF data published over /tf_static + // by various publishers. + sub_options.query_consolidation = z_query_consolidation_none(); if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; } From 762668bcd7d6ec5a717625260d15856f44286041 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 19 Jan 2024 00:45:00 +0800 Subject: [PATCH 14/31] Set default consolidation for client query Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f85d0aac..2105bef0 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2207,7 +2207,7 @@ rmw_send_request( opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // Latest consolidation guarantees unicity of replies for the same key expression. It optimizes bandwidth. // Default is None which imples replies may come in any order and any number. - opts.consolidation = z_query_consolidation_latest(); + opts.consolidation = z_query_consolidation_default(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; opts.value.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); client_data->zn_closure_reply = z_closure(client_data_handler, nullptr, client_data); From 594bb9735c2462bd2a3a89803b7585300bd17a6d Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 19 Jan 2024 00:59:15 +0800 Subject: [PATCH 15/31] Switch to non-blocking channel for liveliness_get Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_init.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 3c0f88d0..9636d206 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -255,12 +255,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) "Sending Query '%s' to fetch discovery data...", liveliness_str.c_str() ); - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // We create a non-blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the zc_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(channel.send), NULL); @@ -271,7 +271,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); - for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); From 02fee0987da5107fe031954e6ec1387294884641 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 19 Jan 2024 14:23:50 +0800 Subject: [PATCH 16/31] Make shm manager optional Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 5 +++-- rmw_zenoh_cpp/src/rmw_init.cpp | 13 +++++++++---- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 10 ++++++---- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e8893eaf..a328a534 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -45,8 +45,9 @@ struct rmw_context_impl_s // An owned session. z_owned_session_t session; - // The SHM manager. - zc_owned_shm_manager_t shm_manager; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager; z_owned_subscriber_t graph_subscriber; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index fbfb9e52..c86d2ca4 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -195,14 +195,18 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_loan(context->impl->session), idstr, SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!zc_shm_manager_check(&context->impl->shm_manager)) { + if (!context->impl->shm_manager.has_value() || + !zc_shm_manager_check(&context->impl->shm_manager.value())) + { RMW_SET_ERROR_MSG("Unable to create shm manager."); return RMW_RET_ERROR; } } auto free_shm_manager = rcpputils::make_scope_exit( [context]() { - z_drop(z_move(context->impl->shm_manager)); + if (context->impl->shm_manager.has_value()) { + z_drop(z_move(context->impl->shm_manager.value())); + } }); // Initialize the guard condition. @@ -341,6 +345,9 @@ rmw_shutdown(rmw_context_t * context) return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + if (context->impl->shm_manager.has_value()) { + z_drop(z_move(context->impl->shm_manager.value())); + } // Close the zenoh session if (z_close(z_move(context->impl->session)) < 0) { RMW_SET_ERROR_MSG("Error while closing zenoh session"); @@ -349,8 +356,6 @@ rmw_shutdown(rmw_context_t * context) const rcutils_allocator_t * allocator = &context->options.allocator; - z_drop(z_move(context->impl->shm_manager)); - RMW_TRY_DESTRUCTOR( static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), GuardCondition, ); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index b5bd5eac..1432f4b2 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -878,13 +878,15 @@ rmw_publish( zc_owned_shmbuf_t shmbuf; // Get memory from SHM buffer if available. - if (zc_shm_manager_check(&publisher_data->context->impl->shm_manager)) { + if (publisher_data->context->impl->shm_manager.has_value() && + zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) + { shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager, + &publisher_data->context->impl->shm_manager.value(), max_data_length); if (!z_check(shmbuf)) { - zc_shm_gc(&publisher_data->context->impl->shm_manager); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager, max_data_length); + zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); + shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); if (!z_check(shmbuf)) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); From 446aec3121850d7f1514f8a7aa50405357f00148 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 19 Jan 2024 16:01:08 +0800 Subject: [PATCH 17/31] Cleanup Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 7a76a2f4..acaa37db 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -58,7 +58,7 @@ void sub_data_handler( if (sub_data->message_queue.size() >= sub_data->adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - RCUTILS_LOG_WARN_NAMED( + RCUTILS_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "Message queue depth of %ld reached, discarding oldest message " "for subscription for %s", diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 1432f4b2..57887309 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -619,12 +619,11 @@ rmw_create_publisher( // Set congestion_control to BLOCK if appropriate. z_publisher_options_t opts = z_publisher_options_default(); + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; - } else { - opts.congestion_control = Z_CONGESTION_CONTROL_DROP; } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( @@ -1383,7 +1382,6 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } - printf("Created querying sub %s\n", owned_key_str._cstr); } // Create a regular subscriber for all other durability settings. else { @@ -1401,7 +1399,6 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } - printf("Created regular sub %s\n", owned_key_str._cstr); } auto undeclare_z_sub = rcpputils::make_scope_exit( From 5314de66a1168ea5d903b5ae84a92ab1ad8b846e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 24 Jan 2024 00:33:22 +0800 Subject: [PATCH 18/31] Explicitly ensure subscription depth is minimum 1 Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 12 +++++++++--- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 4 ++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index acaa37db..89352475 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -65,9 +65,15 @@ void sub_data_handler( sub_data->adapted_qos_profile.depth, z_loan(keystr)); - std::unique_ptr old = std::move(sub_data->message_queue.front()); - z_drop(&old->payload); - sub_data->message_queue.pop_front(); + // If the adapted_qos_profile.depth is 0, the std::move command below will result + // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 + // in rmw_create_subscription() but to be safe, we only attempt to discard from the + // queue if it is non-empty. + if (!sub_data->message_queue.empty()) { + std::unique_ptr old = std::move(sub_data->message_queue.front()); + z_drop(&old->payload); + sub_data->message_queue.pop_front(); + } } sub_data->message_queue.emplace_back( diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 57887309..42b0163d 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1282,6 +1282,10 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); return nullptr; } + // Explicitly ensure the history depth is at least 1. + sub_data->adapted_qos_profile.depth = std::max( + static_cast(1), + sub_data->adapted_qos_profile.depth); sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; From a9b424505c55411dd37db402eae91663673980e8 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 24 Jan 2024 11:09:52 +0800 Subject: [PATCH 19/31] Build zenoh-c with shm feature but do not enable it in the session config Signed-off-by: Yadunund --- rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 | 2 +- zenoh_c_vendor/CMakeLists.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index e1bf138a..837896c0 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -234,7 +234,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: true, // + enabled: false, }, /// Access control configuration auth: { diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 2b3ebdf4..c46edbdc 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -22,7 +22,7 @@ find_package(ament_cmake_vendor_package REQUIRED) # Note: We separate the two args needed for cargo with "$" and not ";" as the # latter is a list separater in cmake and hence the string will be split into two # when expanded. -set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_tcp") +set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_tcp zenoh/shared-memory") ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git From cd0295744cf3142659cf5745b0da71899e9efc45 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 24 Jan 2024 12:10:10 +0800 Subject: [PATCH 20/31] Always drop shmbuf Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 42b0163d..b43c6c25 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -867,15 +867,20 @@ rmw_publish( // To store serialized message byte array. char * msg_bytes = nullptr; - bool from_shm = false; + std::optional shmbuf = std::nullopt; + auto always_free_shmbuf = rcpputils::make_scope_exit( + [&shmbuf]() { + if (shmbuf.has_value()) { + zc_shmbuf_drop(&shmbuf.value()); + } + }); auto free_msg_bytes = rcpputils::make_scope_exit( - [msg_bytes, allocator, from_shm]() { - if (msg_bytes && !from_shm) { + [msg_bytes, allocator, &shmbuf]() { + if (msg_bytes && !shmbuf.has_value()) { allocator->deallocate(msg_bytes, allocator->state); } }); - zc_owned_shmbuf_t shmbuf; // Get memory from SHM buffer if available. if (publisher_data->context->impl->shm_manager.has_value() && zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) @@ -883,17 +888,16 @@ rmw_publish( shmbuf = zc_shm_alloc( &publisher_data->context->impl->shm_manager.value(), max_data_length); - if (!z_check(shmbuf)) { + if (!z_check(shmbuf.value())) { zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); - if (!z_check(shmbuf)) { + if (!z_check(shmbuf.value())) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); return RMW_RET_ERROR; } } - msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf)); - from_shm = true; + msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf.value())); } else { // Get memory from the allocator. msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); @@ -927,9 +931,9 @@ rmw_publish( z_publisher_put_options_t options = z_publisher_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (from_shm) { - zc_shmbuf_set_length(&shmbuf, data_length); - zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf)); + if (shmbuf.has_value()) { + zc_shmbuf_set_length(&shmbuf.value(), data_length); + zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); ret = zc_publisher_put_owned(z_loan(publisher_data->pub), z_move(payload), &options); } else { // Returns 0 if success. From 34f2623fe781ad68fcf0abf7107cd99dd341de89 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 24 Jan 2024 12:36:06 +0800 Subject: [PATCH 21/31] Add comment on why timestamping is enabled in zenoh configs Signed-off-by: Yadunund --- rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 | 2 ++ rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 | 2 ++ 2 files changed, 4 insertions(+) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 8e5eb983..cc4825bc 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -69,6 +69,8 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. + /// PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 837896c0..df48b032 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -69,6 +69,8 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. + /// PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. From 52d16cb5027bb58f9ebb379b36428f149dea09f2 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 24 Jan 2024 13:08:58 +0800 Subject: [PATCH 22/31] Rely on strtol to get queue depth Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 28e9d4fc..d95257f1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -25,6 +25,8 @@ #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" +#include "rmw/error_handling.h" + namespace liveliness { @@ -194,9 +196,28 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.reliability = str_to_qos_reliability.at(parts[0]); qos.durability = str_to_qos_durability.at(parts[1]); } catch (const std::exception & e) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Error setting QoS values from strings: %s", e.what()); + return std::nullopt; + } + // Get the history depth. + errno = 0; + char * endptr; + int64_t num = strtol(history_parts[1].c_str(), &endptr, 10); + if (endptr == history_parts[1].c_str()) { + // No values were converted, this is an error + RMW_SET_ERROR_MSG("no valid numbers available"); + return std::nullopt; + } else if (*endptr != '\0') { + // There was junk after the number + RMW_SET_ERROR_MSG("non-numeric values"); + return std::nullopt; + } else if (errno != 0) { + // Some other error occurred, which may include overflow or underflow + RMW_SET_ERROR_MSG( + "an undefined error occurred while getting the number, this may be an overflow"); return std::nullopt; } - sscanf(history_parts[1].c_str(), "%zu", &qos.depth); + qos.depth = num; // Liveliness is always automatic given liveliness tokens. qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; From 2ad493d518c06d827df7f8a8d7da1acdfea9bd4a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Mon, 29 Jan 2024 14:10:28 +0800 Subject: [PATCH 23/31] Stick to blocking channel for liveliness get Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_init.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index c86d2ca4..85debdec 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -251,12 +251,21 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Query router/liveliness participants to get graph information before this session was started. - // We create a non-blocking channel that is unbounded, ie. `bound` = 0, to receive + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive // replies for the zc_liveliness_get() call. This is necessary as if the `bound` // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. - z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(0); + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), z_move(channel.send), NULL); From 1298abd1ea3124b4a1bca54c4eec6795206766d8 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 31 Jan 2024 11:28:29 +0800 Subject: [PATCH 24/31] Define default history depth Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c224e391..8cc5c9cf 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -57,6 +57,8 @@ #include "rmw_dds_common/qos.hpp" +#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 1; + namespace { @@ -1286,10 +1288,11 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); return nullptr; } - // Explicitly ensure the history depth is at least 1. - sub_data->adapted_qos_profile.depth = std::max( - static_cast(1), - sub_data->adapted_qos_profile.depth); + // If a depth of 0 was provided, the RMW implementation can set the depth to a default. + sub_data->adapted_qos_profile.depth = + sub_data->adapted_qos_profile.depth > 0 ? + sub_data->adapted_qos_profile.depth : + RMW_ZENOH_DEFAULT_HISTORY_DEPTH; sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; @@ -2546,7 +2549,6 @@ rmw_create_service( service_data->adapted_qos_profile = rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); - // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { From 66494e3293ce33d735b9f978fdcb87a23e4e68a9 Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 7 Feb 2024 11:36:34 +0800 Subject: [PATCH 25/31] Update rmw_zenoh_cpp/src/rmw_zenoh.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 8cc5c9cf..0f92a083 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1393,9 +1393,8 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return nullptr; } - } - // Create a regular subscriber for all other durability settings. - else { + } else { + // Create a regular subscriber for all other durability settings. z_subscriber_options_t sub_options = z_subscriber_options_default(); if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { sub_options.reliability = Z_RELIABILITY_RELIABLE; From a8073437e3bd82198da09f35ffc005cd98b88201 Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 7 Feb 2024 11:36:44 +0800 Subject: [PATCH 26/31] Update rmw_zenoh_cpp/src/rmw_zenoh.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 0f92a083..8ec455d5 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1288,7 +1288,7 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); return nullptr; } - // If a depth of 0 was provided, the RMW implementation can set the depth to a default. + // If a depth of 0 was provided, the RMW implementation should choose a suitable default. sub_data->adapted_qos_profile.depth = sub_data->adapted_qos_profile.depth > 0 ? sub_data->adapted_qos_profile.depth : From c807b34855ffbb454c0056c2fa02d1759cf64116 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Feb 2024 12:28:10 +0800 Subject: [PATCH 27/31] Add example for liveliness keyexpr Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index d95257f1..71a44585 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -163,6 +163,9 @@ std::vector split_keyexpr( * - enum value from rmw_qos_durability_policy_e. * - enum value from rmw_qos_history_policy_e. * - The depth number. + * For example, the liveliness substring for a topic with Reliability policy: reliable, + * Durability policy: volatile, History policy: keep_last, and depth: 10, would be + * "1:2:1,10". See rmw/types.h for the values of each policy enum. */ // TODO(Yadunund): Rely on maps to retrieve strings. std::string qos_to_keyexpr(rmw_qos_profile_t qos) @@ -228,7 +231,6 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) qos.lifespan = RMW_QOS_LIFESPAN_DEFAULT; return qos; } - } // namespace ///============================================================================= @@ -268,7 +270,15 @@ Entity::Entity( * //////// * - The ROS topic name for this entity. * - The type for the topic. - * - The qos for the topic. + * - The qos for the topic (see qos_to_keyexpr() docstring for more information). + * + * For example, the liveliness expression for a publisher within a /talker node that publishes + * an std_msgs/msg/String over topic /chatter and with QoS settings of Reliability: best_effort, + * Durability: transient_local, History: keep_all, and depth: 10, would be + * "@ros2_lv/0/q1w2e3r4t5y6/MP/_/talker/dds_::std_msgs::msg::String/2:1:2,10". + * Note: The domain_id is assumed to be 0 and a random id is used in the example. Also the + * _dds:: prefix in the topic_type is an artifact of the type support implementation and is + * removed when reporting the topic_type in graph_cache.cpp (see _demangle_if_ros_type()). */ std::stringstream token_ss; const std::string & ns = node_info_.ns_; From 009837101b2f70e3b6cae2db88a45cbbd742a4cb Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Feb 2024 14:39:32 +0800 Subject: [PATCH 28/31] Simplify split_keyexpr Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 71a44585..a2aef62e 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -128,29 +128,16 @@ std::vector split_keyexpr( const std::string & keyexpr, const char delim = '/') { - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const size_t prev_idx = delim_idx[i - 1]; - const size_t idx = delim_idx[i]; - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; + size_t start = 0; + size_t end = keyexpr.find(delim); + while (end != std::string::npos) { + result.push_back(keyexpr.substr(start, end - start)); + start = end + 1; + end = keyexpr.find(delim, start); } // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); + result.push_back(keyexpr.substr(start)); return result; } From e85d45d6d3796b08dc7983562aa025d5ed4a8a56 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Feb 2024 15:05:17 +0800 Subject: [PATCH 29/31] Increase and justify default depth Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 8ec455d5..5f9af074 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -57,7 +57,15 @@ #include "rmw_dds_common/qos.hpp" -#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 1; +// If the depth field in the qos profile is set to 0, the RMW implementation +// has the liberty to assign a default depth. The zenoh transport protocol +// is configured with 256 channels so theoretically, this would be the maximum +// depth we can set before blocking transport. A high depth would increase the +// memory footprint of processes as more messages are stored in memory while a +// very low depth might unintentionally drop messages leading to a poor +// out-of-the-box experience for new users. For now we set the depth to 42, +// a popular "magic number". See https://en.wikipedia.org/wiki/42_(number). +#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 42; namespace { From 966a043d40d5175a38aaa4a73762992b5d2cfe44 Mon Sep 17 00:00:00 2001 From: Yadu Date: Wed, 7 Feb 2024 22:34:01 +0800 Subject: [PATCH 30/31] Update rmw_zenoh_cpp/src/detail/liveliness_utils.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index a2aef62e..db93ec3f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -192,7 +192,7 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) // Get the history depth. errno = 0; char * endptr; - int64_t num = strtol(history_parts[1].c_str(), &endptr, 10); + unsigned long num = strtoul(history_parts[1].c_str(), &endptr, 10); if (endptr == history_parts[1].c_str()) { // No values were converted, this is an error RMW_SET_ERROR_MSG("no valid numbers available"); From d08836073972e51d9d75433800a3a80c431eba8d Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 7 Feb 2024 11:17:13 -0500 Subject: [PATCH 31/31] Fix minor cpplint problems. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 2 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 1 - rmw_zenoh_cpp/src/rmw_qos.cpp | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index db93ec3f..430468f1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -192,7 +192,7 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) // Get the history depth. errno = 0; char * endptr; - unsigned long num = strtoul(history_parts[1].c_str(), &endptr, 10); + size_t num = strtoul(history_parts[1].c_str(), &endptr, 10); if (endptr == history_parts[1].c_str()) { // No values were converted, this is an error RMW_SET_ERROR_MSG("no valid numbers available"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 7effa150..99eb3fd9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -95,7 +95,6 @@ void rmw_subscription_data_t::add_new_message( z_drop(z_move(old->payload)); message_queue_.pop_front(); } - } message_queue_.emplace_back(std::move(msg)); diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 2cb42577..0dcf6a4d 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -28,7 +28,6 @@ rmw_qos_profile_check_compatible( char * reason, size_t reason_size) { - // In Zenoh, publishers do not have any reliability settings. // A publisher and subscription are only incompatible if the durability of the publisher is // TRANSIENT_LOCAL but that of the subscription is not. In such a scenario, a late-joining