diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index a625bd84..b62cccc4 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) +find_package(rmw_dds_common REQUIRED) find_package(zenoh_c_vendor REQUIRED) find_package(zenohc REQUIRED) @@ -57,6 +58,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw + rmw_dds_common::rmw_dds_common_library zenohc::lib ) diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index f20b002d..cc4825bc 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -69,7 +69,9 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + /// PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index 8a07b61e..df48b032 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -69,7 +69,9 @@ timestamping: { /// Whether data messages should be timestamped if not already. /// Accepts a single boolean value or different values for router, peer and client. - enabled: { router: true, peer: false, client: false }, + /// PublicationCache which is required for transient_local durability + /// only works when time-stamping is enabled. + enabled: { router: true, peer: true, client: false }, /// Whether data messages with timestamps in the future should be dropped or not. /// If set to false (default), messages with timestamps in the future are retimestamped. /// Timestamps are ignored if timestamping is disabled. @@ -234,7 +236,7 @@ }, /// Shared memory configuration shared_memory: { - enabled: true, // + enabled: false, }, /// Access control configuration auth: { diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index a53ba6cc..67b488be 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -18,6 +18,7 @@ rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp rmw + rmw_dds_common ament_lint_auto ament_lint_common diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index 3282ade0..c442ddfd 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -34,6 +34,7 @@ #include "rmw/validate_node_name.h" #include "graph_cache.hpp" +#include "rmw_data_types.hpp" ///============================================================================= using Entity = liveliness::Entity; @@ -679,6 +680,46 @@ rmw_ret_t GraphCache::get_topic_names_and_types( return fill_names_and_types(graph_topics_, allocator, topic_names_and_types); } +///============================================================================= +rmw_ret_t GraphCache::publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + // TODO(Yadunund): Check if QoS settings also match. + *subscription_count = 0; + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name); + if (topic_it != graph_topics_.end()) { + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + pub_data->type_support->get_name()); + if (topic_data_it != topic_it->second.end()) { + *subscription_count = topic_data_it->second->stats_.sub_count_; + } + } + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t GraphCache::subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + // TODO(Yadunund): Check if QoS settings also match. + *publisher_count = 0; + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name); + if (topic_it != graph_topics_.end()) { + rmw_subscription_data_t * sub_data = static_cast(subscription->data); + GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find( + sub_data->type_support->get_name()); + if (topic_data_it != topic_it->second.end()) { + *publisher_count = topic_data_it->second->stats_.pub_count_; + } + } + + return RMW_RET_OK; +} + ///============================================================================= rmw_ret_t GraphCache::get_service_names_and_types( rcutils_allocator_t * allocator, @@ -893,7 +934,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( entity_type == EntityType::Publisher ? nodes[i]->pubs_ : nodes[i]->subs_; const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second; - for (const auto & [topic_data, _] : topic_data_map) { + for (const auto & [topic_type, topic_data] : topic_data_map) { rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; endpoint_info = rmw_get_zero_initialized_topic_endpoint_info(); @@ -915,7 +956,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( ret = rmw_topic_endpoint_info_set_topic_type( &endpoint_info, - _demangle_if_ros_type(topic_data).c_str(), + _demangle_if_ros_type(topic_type).c_str(), allocator); if (RMW_RET_OK != ret) { return ret; @@ -927,7 +968,16 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( if (RMW_RET_OK != ret) { return ret; } - // TODO(Yadunund): Set type_hash, qos_profile, gid. + + ret = rmw_topic_endpoint_info_set_qos_profile( + &endpoint_info, + &topic_data->info_.qos_ + ); + if (RMW_RET_OK != ret) { + return ret; + } + + // TODO(Yadunund): Set type_hash, gid. } } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 85e08a15..6add0f7c 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -104,6 +104,14 @@ class GraphCache final bool no_demangle, rmw_names_and_types_t * topic_names_and_types) const; + rmw_ret_t publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count); + + rmw_ret_t subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count); + rmw_ret_t get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types) const; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 00718196..430468f1 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -25,6 +25,8 @@ #include "rcpputils/scope_exit.hpp" #include "rcutils/logging_macros.h" +#include "rmw/error_handling.h" + namespace liveliness { @@ -47,7 +49,7 @@ NodeInfo::NodeInfo( TopicInfo::TopicInfo( std::string name, std::string type, - std::string qos) + rmw_qos_profile_t qos) : name_(std::move(name)), type_(std::move(type)), qos_(std::move(qos)) @@ -67,6 +69,8 @@ static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; static const char SLASH_REPLACEMENT = '%'; +static const char QOS_DELIMITER = ':'; +static const char QOS_HISTORY_DELIMITER = ','; static const std::unordered_map entity_to_str = { {EntityType::Node, NODE_STR}, @@ -84,18 +88,136 @@ static const std::unordered_map str_to_entity = { {CLI_STR, EntityType::Client} }; +static const std::unordered_map str_to_qos_history = { + {std::to_string(RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT), RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_LAST), RMW_QOS_POLICY_HISTORY_KEEP_LAST}, + {std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_ALL), RMW_QOS_POLICY_HISTORY_KEEP_ALL}, + {std::to_string(RMW_QOS_POLICY_HISTORY_UNKNOWN), RMW_QOS_POLICY_HISTORY_UNKNOWN} +}; + +static const std::unordered_map str_to_qos_reliability = { + {std::to_string(RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT), + RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_RELIABLE), RMW_QOS_POLICY_RELIABILITY_RELIABLE}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT), RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT}, + {std::to_string(RMW_QOS_POLICY_RELIABILITY_UNKNOWN), RMW_QOS_POLICY_RELIABILITY_UNKNOWN} +}; + +static const std::unordered_map str_to_qos_durability = { + {std::to_string(RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT), + RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL), + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_VOLATILE), RMW_QOS_POLICY_DURABILITY_VOLATILE}, + {std::to_string(RMW_QOS_POLICY_DURABILITY_UNKNOWN), RMW_QOS_POLICY_DURABILITY_UNKNOWN} +}; + std::string zid_to_str(z_id_t id) { std::stringstream ss; ss << std::hex; size_t i = 0; - for (; i < (sizeof(id.id) - 1); i++) { - ss << static_cast(id.id[i]) << "."; + for (; i < (sizeof(id.id)); i++) { + ss << static_cast(id.id[i]); } - ss << static_cast(id.id[i]); return ss.str(); } +std::vector split_keyexpr( + const std::string & keyexpr, + const char delim = '/') +{ + std::vector result = {}; + size_t start = 0; + size_t end = keyexpr.find(delim); + while (end != std::string::npos) { + result.push_back(keyexpr.substr(start, end - start)); + start = end + 1; + end = keyexpr.find(delim, start); + } + // Finally add the last substr. + result.push_back(keyexpr.substr(start)); + return result; +} + +/** + * Convert a rmw_qos_profile_t to a string with format: + * + * ::," + * Where: + * - enum value from rmw_qos_reliability_policy_e. + * - enum value from rmw_qos_durability_policy_e. + * - enum value from rmw_qos_history_policy_e. + * - The depth number. + * For example, the liveliness substring for a topic with Reliability policy: reliable, + * Durability policy: volatile, History policy: keep_last, and depth: 10, would be + * "1:2:1,10". See rmw/types.h for the values of each policy enum. + */ +// TODO(Yadunund): Rely on maps to retrieve strings. +std::string qos_to_keyexpr(rmw_qos_profile_t qos) +{ + std::string keyexpr = ""; + keyexpr += std::to_string(qos.reliability); + keyexpr += QOS_DELIMITER; + keyexpr += std::to_string(qos.durability); + keyexpr += QOS_DELIMITER; + keyexpr += std::to_string(qos.history); + keyexpr += QOS_HISTORY_DELIMITER; + keyexpr += std::to_string(qos.depth); + return keyexpr; +} + +/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. +std::optional keyexpr_to_qos(const std::string & keyexpr) +{ + rmw_qos_profile_t qos; + const std::vector parts = split_keyexpr(keyexpr, QOS_DELIMITER); + if (parts.size() < 3) { + return std::nullopt; + } + const std::vector history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER); + if (history_parts.size() < 2) { + return std::nullopt; + } + + try { + qos.history = str_to_qos_history.at(history_parts[0]); + qos.reliability = str_to_qos_reliability.at(parts[0]); + qos.durability = str_to_qos_durability.at(parts[1]); + } catch (const std::exception & e) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Error setting QoS values from strings: %s", e.what()); + return std::nullopt; + } + // Get the history depth. + errno = 0; + char * endptr; + size_t num = strtoul(history_parts[1].c_str(), &endptr, 10); + if (endptr == history_parts[1].c_str()) { + // No values were converted, this is an error + RMW_SET_ERROR_MSG("no valid numbers available"); + return std::nullopt; + } else if (*endptr != '\0') { + // There was junk after the number + RMW_SET_ERROR_MSG("non-numeric values"); + return std::nullopt; + } else if (errno != 0) { + // Some other error occurred, which may include overflow or underflow + RMW_SET_ERROR_MSG( + "an undefined error occurred while getting the number, this may be an overflow"); + return std::nullopt; + } + qos.depth = num; + + // Liveliness is always automatic given liveliness tokens. + qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + qos.liveliness_lease_duration = RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT; + + // TODO(Yadunund): Update once we support these settings. + qos.deadline = RMW_QOS_DEADLINE_DEFAULT; + qos.lifespan = RMW_QOS_LIFESPAN_DEFAULT; + return qos; +} } // namespace ///============================================================================= @@ -135,7 +257,15 @@ Entity::Entity( * //////// * - The ROS topic name for this entity. * - The type for the topic. - * - The qos for the topic. + * - The qos for the topic (see qos_to_keyexpr() docstring for more information). + * + * For example, the liveliness expression for a publisher within a /talker node that publishes + * an std_msgs/msg/String over topic /chatter and with QoS settings of Reliability: best_effort, + * Durability: transient_local, History: keep_all, and depth: 10, would be + * "@ros2_lv/0/q1w2e3r4t5y6/MP/_/talker/dds_::std_msgs::msg::String/2:1:2,10". + * Note: The domain_id is assumed to be 0 and a random id is used in the example. Also the + * _dds:: prefix in the topic_type is an artifact of the type support implementation and is + * removed when reporting the topic_type in graph_cache.cpp (see _demangle_if_ros_type()). */ std::stringstream token_ss; const std::string & ns = node_info_.ns_; @@ -156,7 +286,8 @@ Entity::Entity( const auto & topic_info = this->topic_info_.value(); // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". token_ss << - "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_; + "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr( + topic_info.qos_); } this->keyexpr_ = token_ss.str(); @@ -186,41 +317,6 @@ std::optional Entity::make( return entity; } -namespace -{ -///============================================================================= -std::vector split_keyexpr( - const std::string & keyexpr, - const char delim = '/') -{ - std::vector delim_idx = {}; - // Insert -1 for starting position to make the split easier when using substr. - delim_idx.push_back(-1); - std::size_t idx = 0; - for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) { - if (*it == delim) { - delim_idx.push_back(idx); - } - ++idx; - } - std::vector result = {}; - try { - for (std::size_t i = 1; i < delim_idx.size(); ++i) { - const size_t prev_idx = delim_idx[i - 1]; - const size_t idx = delim_idx[i]; - result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1)); - } - } catch (const std::exception & e) { - printf("%s\n", e.what()); - return {}; - } - // Finally add the last substr. - result.push_back(keyexpr.substr(delim_idx.back() + 1)); - return result; -} - -} // namespace - ///============================================================================= std::optional Entity::make(const std::string & keyexpr) { @@ -276,10 +372,18 @@ std::optional Entity::make(const std::string & keyexpr) "Received liveliness token for non-node entity without required parameters."); return std::nullopt; } + std::optional qos = keyexpr_to_qos(parts[8]); + if (!qos.has_value()) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Received liveliness token with invalid qos keyexpr"); + return std::nullopt; + } topic_info = TopicInfo{ demangle_name(std::move(parts[6])), std::move(parts[7]), - std::move(parts[8])}; + std::move(qos.value()) + }; } return Entity{ diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 66f2f11b..031edc3a 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -21,6 +21,7 @@ #include #include +#include "rmw/types.h" namespace liveliness { @@ -45,12 +46,12 @@ struct TopicInfo { std::string name_; std::string type_; - std::string qos_; + rmw_qos_profile_t qos_; TopicInfo( std::string name, std::string type, - std::string qos); + rmw_qos_profile_t qos); }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 0ee14199..99eb3fd9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -77,18 +77,24 @@ void rmw_subscription_data_t::add_new_message( { std::lock_guard lock(message_queue_mutex_); - if (message_queue_.size() >= queue_depth) { + if (message_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - RCUTILS_LOG_WARN_NAMED( + RCUTILS_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "Message queue depth of %ld reached, discarding oldest message " "for subscription for %s", - queue_depth, + adapted_qos_profile.depth, topic_name.c_str()); - std::unique_ptr old = std::move(message_queue_.front()); - z_drop(z_move(old->payload)); - message_queue_.pop_front(); + // If the adapted_qos_profile.depth is 0, the std::move command below will result + // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 + // in rmw_create_subscription() but to be safe, we only attempt to discard from the + // queue if it is non-empty. + if (!message_queue_.empty()) { + std::unique_ptr old = std::move(message_queue_.front()); + z_drop(z_move(old->payload)); + message_queue_.pop_front(); + } } message_queue_.emplace_back(std::move(msg)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 133dd876..0100211f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include "rcutils/allocator.h" @@ -44,8 +45,9 @@ struct rmw_context_impl_s // An owned session. z_owned_session_t session; - // The SHM manager. - zc_owned_shm_manager_t shm_manager; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager; z_owned_subscriber_t graph_subscriber; @@ -74,6 +76,12 @@ struct rmw_publisher_data_t // An owned publisher. z_owned_publisher_t pub; + // Optional publication cache when durability is transient_local. + std::optional pub_cache; + + // Store the actual QoS profile used to configure this publisher. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the publisher. zc_owned_liveliness_token_t token; @@ -112,7 +120,11 @@ struct saved_msg_data class rmw_subscription_data_t final { public: - z_owned_subscriber_t sub; + // An owned subscriber or querying_subscriber depending on the QoS settings. + std::variant sub; + + // Store the actual QoS profile used to configure this subscription. + rmw_qos_profile_t adapted_qos_profile; // Liveliness token for the subscription. zc_owned_liveliness_token_t token; @@ -122,9 +134,6 @@ class rmw_subscription_data_t final MessageTypeSupport * type_support; rmw_context_t * context; - size_t queue_depth; - bool reliable; - void attach_condition(std::condition_variable * condition_variable); void detach_condition(); @@ -173,6 +182,10 @@ class rmw_service_data_t final z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; + // Store the actual QoS profile used to configure this service. + // The QoS is reused for getting requests and sending responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the service. zc_owned_liveliness_token_t token; @@ -234,6 +247,10 @@ class rmw_client_data_t final z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; + // Store the actual QoS profile used to configure this client. + // The QoS is reused for sending requests and getting responses. + rmw_qos_profile_t adapted_qos_profile; + // Liveliness token for the client. zc_owned_liveliness_token_t token; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 9cff5ca2..85debdec 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -195,14 +195,18 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_loan(context->impl->session), idstr, SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!zc_shm_manager_check(&context->impl->shm_manager)) { + if (!context->impl->shm_manager.has_value() || + !zc_shm_manager_check(&context->impl->shm_manager.value())) + { RMW_SET_ERROR_MSG("Unable to create shm manager."); return RMW_RET_ERROR; } } auto free_shm_manager = rcpputils::make_scope_exit( [context]() { - z_drop(z_move(context->impl->shm_manager)); + if (context->impl->shm_manager.has_value()) { + z_drop(z_move(context->impl->shm_manager.value())); + } }); // Initialize the guard condition. @@ -252,6 +256,15 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // is too low, the channel may starve the zenoh executor of its threads which // would lead to deadlocks when trying to receive replies and block the // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. z_owned_reply_channel_t channel = zc_reply_fifo_new(0); zc_liveliness_get( z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), @@ -263,7 +276,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), "", z_move(channel.send), // &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); - for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); @@ -335,11 +353,10 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - // We destroy zenoh artifacts here instead of rmw_shutdown() as - // rmw_shutdown() is invoked before rmw_destroy_node() however we still need the session - // alive for the latter. - // TODO(Yadunund): Check if this is a bug in rmw. z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + if (context->impl->shm_manager.has_value()) { + z_drop(z_move(context->impl->shm_manager.value())); + } // Close the zenoh session if (z_close(z_move(context->impl->session)) < 0) { RMW_SET_ERROR_MSG("Error while closing zenoh session"); @@ -348,8 +365,6 @@ rmw_shutdown(rmw_context_t * context) const rcutils_allocator_t * allocator = &context->options.allocator; - z_drop(z_move(context->impl->shm_manager)); - RMW_TRY_DESTRUCTOR( static_cast(context->impl->graph_guard_condition->data)->~GuardCondition(), GuardCondition, ); diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 8ae5b4f2..0dcf6a4d 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -16,6 +16,8 @@ #include "rmw/types.h" #include "rmw/qos_profiles.h" +#include "rmw_dds_common/qos.hpp" + extern "C" { rmw_ret_t @@ -26,28 +28,13 @@ rmw_qos_profile_check_compatible( char * reason, size_t reason_size) { - if (!compatibility) { - RMW_SET_ERROR_MSG("compatibility parameter is null"); - return RMW_RET_INVALID_ARGUMENT; - } - - if (!reason && reason_size != 0u) { - RMW_SET_ERROR_MSG("reason parameter is null, but reason_size parameter is not zero"); - return RMW_RET_INVALID_ARGUMENT; - } - - // Presume profiles are compatible until proven otherwise - *compatibility = RMW_QOS_COMPATIBILITY_OK; - - // Initialize reason buffer - if (reason && reason_size != 0u) { - reason[0] = '\0'; - } - - // TODO(clalancette): check compatibility in Zenoh QOS profiles. - (void)publisher_profile; - (void)subscription_profile; - - return RMW_RET_OK; + // In Zenoh, publishers do not have any reliability settings. + // A publisher and subscription are only incompatible if the durability of the publisher is + // TRANSIENT_LOCAL but that of the subscription is not. In such a scenario, a late-joining + // subscription can fail to receive messages so we flag it accordingly. + // However, we can reuse the qos_profile_check_compatible() method from rmw_dds_common + // since it largely applies in rmw_zenoh. + return rmw_dds_common::qos_profile_check_compatible( + publisher_profile, subscription_profile, compatibility, reason, reason_size); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index c510db3e..5f9af074 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -55,6 +55,18 @@ #include "rmw/validate_namespace.h" #include "rmw/validate_node_name.h" +#include "rmw_dds_common/qos.hpp" + +// If the depth field in the qos profile is set to 0, the RMW implementation +// has the liberty to assign a default depth. The zenoh transport protocol +// is configured with 256 channels so theoretically, this would be the maximum +// depth we can set before blocking transport. A high depth would increase the +// memory footprint of processes as more messages are stored in memory while a +// very low depth might unintentionally drop messages leading to a poor +// out-of-the-box experience for new users. For now we set the depth to 42, +// a popular "magic number". See https://en.wikipedia.org/wiki/42_(number). +#define RMW_ZENOH_DEFAULT_HISTORY_DEPTH 42; + namespace { @@ -459,13 +471,6 @@ rmw_create_publisher( return nullptr; } } - // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( - // node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); if (publisher_options->require_unique_network_flow_endpoints == RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) @@ -534,6 +539,13 @@ rmw_create_publisher( publisher_data->~rmw_publisher_data_t(), rmw_publisher_data_t); }); + // Adapt any 'best available' QoS options + publisher_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher( + node, topic_name, &publisher_data->adapted_qos_profile, rmw_get_subscriptions_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } publisher_data->typesupport_identifier = type_support->typesupport_identifier; publisher_data->type_support_impl = type_support->data; auto callbacks = static_cast(type_support->data); @@ -583,8 +595,6 @@ rmw_create_publisher( allocator->deallocate(const_cast(rmw_publisher->topic_name), allocator->state); }); - // TODO(yadunund): Parse adapted_qos_profile and publisher_options to generate - // a z_publisher_put_options struct instead of passing NULL to this function. z_owned_keyexpr_t keyexpr = ros_topic_name_to_zenoh_key( topic_name, node->context->actual_domain_id, allocator); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( @@ -595,11 +605,41 @@ rmw_create_publisher( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } + + // Create a Publication Cache if durability is transient_local. + if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; + publisher_data->pub_cache = ze_declare_publication_cache( + z_loan(context_impl->session), + z_loan(keyexpr), + &pub_cache_opts + ); + if (!publisher_data->pub_cache.has_value() || !z_check(publisher_data->pub_cache.value())) { + RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); + return nullptr; + } + } + auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( + [publisher_data]() { + if (publisher_data && publisher_data->pub_cache.has_value()) { + z_drop(z_move(publisher_data->pub_cache.value())); + } + }); + + // Set congestion_control to BLOCK if appropriate. + z_publisher_options_t opts = z_publisher_options_default(); + opts.congestion_control = Z_CONGESTION_CONTROL_DROP; + if (publisher_data->adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL && + publisher_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) + { + opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( z_loan(context_impl->session), z_loan(keyexpr), - NULL + &opts ); if (!z_check(publisher_data->pub)) { RMW_SET_ERROR_MSG("unable to create zenoh publisher"); @@ -632,7 +672,7 @@ rmw_create_publisher( liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, - publisher_data->type_support->get_name(), "reliable"} + publisher_data->type_support->get_name(), publisher_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -659,6 +699,7 @@ rmw_create_publisher( } free_token.cancel(); + undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); free_topic_name.cancel(); destruct_msg_type_support.cancel(); @@ -714,7 +755,9 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) // return RMW_RET_ERROR; // } z_drop(z_move(publisher_data->token)); - + if (publisher_data->pub_cache.has_value()) { + z_drop(z_move(publisher_data->pub_cache.value())); + } RMW_TRY_DESTRUCTOR(publisher_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(publisher_data->type_support, allocator->state); if (z_undeclare_publisher(z_move(publisher_data->pub))) { @@ -834,31 +877,37 @@ rmw_publish( // To store serialized message byte array. char * msg_bytes = nullptr; - bool from_shm = false; + std::optional shmbuf = std::nullopt; + auto always_free_shmbuf = rcpputils::make_scope_exit( + [&shmbuf]() { + if (shmbuf.has_value()) { + zc_shmbuf_drop(&shmbuf.value()); + } + }); auto free_msg_bytes = rcpputils::make_scope_exit( - [msg_bytes, allocator, from_shm]() { - if (msg_bytes && !from_shm) { + [msg_bytes, allocator, &shmbuf]() { + if (msg_bytes && !shmbuf.has_value()) { allocator->deallocate(msg_bytes, allocator->state); } }); - zc_owned_shmbuf_t shmbuf; // Get memory from SHM buffer if available. - if (zc_shm_manager_check(&publisher_data->context->impl->shm_manager)) { + if (publisher_data->context->impl->shm_manager.has_value() && + zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) + { shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager, + &publisher_data->context->impl->shm_manager.value(), max_data_length); - if (!z_check(shmbuf)) { - zc_shm_gc(&publisher_data->context->impl->shm_manager); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager, max_data_length); - if (!z_check(shmbuf)) { + if (!z_check(shmbuf.value())) { + zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); + shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); + if (!z_check(shmbuf.value())) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); return RMW_RET_ERROR; } } - msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf)); - from_shm = true; + msg_bytes = reinterpret_cast(zc_shmbuf_ptr(&shmbuf.value())); } else { // Get memory from the allocator. msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); @@ -892,9 +941,9 @@ rmw_publish( z_publisher_put_options_t options = z_publisher_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_EMPTY, NULL); - if (from_shm) { - zc_shmbuf_set_length(&shmbuf, data_length); - zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf)); + if (shmbuf.has_value()) { + zc_shmbuf_set_length(&shmbuf.value(), data_length); + zc_owned_payload_t payload = zc_shmbuf_into_payload(z_move(shmbuf.value())); ret = zc_publisher_put_owned(z_loan(publisher_data->pub), z_move(payload), &options); } else { // Returns 0 if success. @@ -933,11 +982,23 @@ rmw_publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count) { - static_cast(publisher); - static_cast(subscription_count); - // TODO(yadunund): Fixme. - *subscription_count = 0; - return RMW_RET_OK; + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription_count, RMW_RET_INVALID_ARGUMENT); + + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + + return context_impl->graph_cache.publisher_count_matched_subscriptions( + publisher, subscription_count); } //============================================================================== @@ -947,9 +1008,18 @@ rmw_publisher_get_actual_qos( const rmw_publisher_t * publisher, rmw_qos_profile_t * qos) { - static_cast(publisher); - static_cast(qos); - // TODO(yadunund): Fixme. + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_publisher_data_t * pub_data = static_cast(publisher->data); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + + *qos = pub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1151,21 +1221,8 @@ rmw_create_subscription( return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); - // Adapt any 'best available' QoS options - // rmw_qos_profile_t adapted_qos_profile = *qos_profile; - // rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( - // node, topic_name, &adapted_qos_profile, rmw_get_publishers_info_by_topic); - // if (RMW_RET_OK != ret) { - // return nullptr; - // } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); - // Check RMW QoS - // if (!is_valid_qos(*qos_profile)) { - // RMW_SET_ERROR_MSG("create_subscription() called with invalid QoS"); - // return nullptr; - // } - const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { // error was already set by find_message_type_support @@ -1231,14 +1288,19 @@ rmw_create_subscription( rmw_subscription_data_t); }); - // Set the reliability of the subscription options based on qos_profile. - // The default options will be initialized with Best Effort reliability. - auto sub_options = z_subscriber_options_default(); - sub_data->reliable = false; - if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - sub_data->reliable = true; + // Adapt any 'best available' QoS options + sub_data->adapted_qos_profile = *qos_profile; + rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_subscription( + node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + return nullptr; } + // If a depth of 0 was provided, the RMW implementation should choose a suitable default. + sub_data->adapted_qos_profile.depth = + sub_data->adapted_qos_profile.depth > 0 ? + sub_data->adapted_qos_profile.depth : + RMW_ZENOH_DEFAULT_HISTORY_DEPTH; sub_data->typesupport_identifier = type_support->typesupport_identifier; sub_data->type_support_impl = type_support->data; @@ -1266,7 +1328,6 @@ rmw_create_subscription( MessageTypeSupport); }); - sub_data->queue_depth = qos_profile->depth; sub_data->context = node->context; rmw_subscription->implementation_identifier = rmw_zenoh_identifier; @@ -1306,19 +1367,70 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(sub_data->sub)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return nullptr; + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. + z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); + auto always_drop_keystr = rcpputils::make_scope_exit( + [&owned_key_str]() { + z_drop(z_move(owned_key_str)); + }); + + if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + // TODO(Yadunund): We need the query_target to be ALL_COMPLETE but the PubCache created + // does not setup a queryable that is complete. Once zettascale exposes that API, + // uncomment below. + // sub_options.query_target = Z_QUERY_TARGET_ALL_COMPLETE; + sub_options.query_target = Z_QUERY_TARGET_ALL; + // We set consolidation to none as we need to receive transient local messages + // from a number of publishers. Eg: To receive TF data published over /tf_static + // by various publishers. + sub_options.query_consolidation = z_query_consolidation_none(); + if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + } + sub_data->sub = ze_declare_querying_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } + } else { + // Create a regular subscriber for all other durability settings. + z_subscriber_options_t sub_options = z_subscriber_options_default(); + if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + } + sub_data->sub = z_declare_subscriber( + z_loan(context_impl->session), + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } } + auto undeclare_z_sub = rcpputils::make_scope_exit( [sub_data]() { - z_undeclare_subscriber(z_move(sub_data->sub)); + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); + if (sub == nullptr || z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } + } }); // Publish to the graph that a new subscription is in town @@ -1327,7 +1439,7 @@ rmw_create_subscription( liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, - sub_data->type_support->get_name(), "reliable"} + sub_data->type_support->get_name(), sub_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -1395,9 +1507,19 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); - if (z_undeclare_subscriber(z_move(sub_data->sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - ret = RMW_RET_ERROR; + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); + if (sub != nullptr) { + if (z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + ret = RMW_RET_ERROR; + } } RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); @@ -1416,12 +1538,23 @@ rmw_subscription_count_matched_publishers( const rmw_subscription_t * subscription, size_t * publisher_count) { - static_cast(subscription); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT); - // TODO(clalancette): implement - *publisher_count = 0; + rmw_subscription_data_t * sub_data = static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_UNSUPPORTED; + return context_impl->graph_cache.subscription_count_matched_publishers( + subscription, publisher_count); } //============================================================================== @@ -1442,8 +1575,7 @@ rmw_subscription_get_actual_qos( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - qos->reliability = sub_data->reliable ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : - RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + *qos = sub_data->adapted_qos_profile; return RMW_RET_OK; } @@ -1890,7 +2022,7 @@ rmw_create_client( liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, - std::move(service_type), "reliable"} + std::move(service_type), client_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2303,9 +2435,18 @@ rmw_client_request_publisher_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_client_data_t * client_data = static_cast(client->data); + RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); + + *qos = client_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2316,10 +2457,8 @@ rmw_client_response_subscription_get_actual_qos( const rmw_client_t * client, rmw_qos_profile_t * qos) { - // TODO(francocipollone): Fix. - static_cast(client); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for sending requests and receiving responses. + return rmw_client_request_publisher_get_actual_qos(client, qos); } //============================================================================== @@ -2413,6 +2552,10 @@ rmw_create_service( rmw_service_data_t); }); + // Adapt any 'best available' QoS options + service_data->adapted_qos_profile = + rmw_dds_common::qos_profile_update_best_available_for_services(*qos_profiles); + // Get the RMW type support. const rosidl_service_type_support_t * type_support = find_service_type_support(type_supports); if (type_support == nullptr) { @@ -2539,7 +2682,7 @@ rmw_create_service( liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, - std::move(service_type), "reliable"} + std::move(service_type), service_data->adapted_qos_profile} ); if (!liveliness_entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( @@ -2833,9 +2976,18 @@ rmw_service_request_subscription_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + rmw_zenoh_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + rmw_service_data_t * service_data = static_cast(service->data); + RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); + + *qos = service_data->adapted_qos_profile; return RMW_RET_OK; } @@ -2846,10 +2998,8 @@ rmw_service_response_publisher_get_actual_qos( const rmw_service_t * service, rmw_qos_profile_t * qos) { - // TODO(yadunund): Fix. - static_cast(service); - static_cast(qos); - return RMW_RET_OK; + // The same QoS profile is used for receiving requests and sending responses. + return rmw_service_request_subscription_get_actual_qos(service, qos); } //============================================================================== diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_c_vendor/CMakeLists.txt index 2b3ebdf4..c46edbdc 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_c_vendor/CMakeLists.txt @@ -22,7 +22,7 @@ find_package(ament_cmake_vendor_package REQUIRED) # Note: We separate the two args needed for cargo with "$" and not ";" as the # latter is a list separater in cmake and hence the string will be split into two # when expanded. -set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_tcp") +set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=zenoh/transport_tcp zenoh/shared-memory") ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git