Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support basic QoS settings #91

Merged
merged 36 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1a3b459
Create querying subscriber if qos is transient local
Yadunund Jan 11, 2024
3f5a607
Update graph cache and introspection methods with qos
Yadunund Jan 11, 2024
eed1d12
Initialize pubcache only if durability is transient local
Yadunund Jan 11, 2024
32c788b
Set publisher congestion when appropriate
Yadunund Jan 11, 2024
eb91c0e
Make liveliness tokens more compact
Yadunund Jan 11, 2024
21a0d8c
Fix rmw_publisher_count_matched_subscriptions
Yadunund Jan 11, 2024
4e86b10
Explictly map qos settings
Yadunund Jan 15, 2024
9d49ef3
Reuse qos_profile_check_compatibility() from rmw_dds_common
Yadunund Jan 15, 2024
b7a27be
Make transient_local work
Yadunund Jan 16, 2024
48bb3e5
Close zenoh session in rmw_shutdown
Yadunund Jan 18, 2024
2099b8b
store adapted_qos_profiles in pub/sub/client/service
Yadunund Jan 18, 2024
7d7df28
Implement rmw_subscription_count_matched_publishers
Yadunund Jan 18, 2024
cba604f
Use optional for pubcache and set consolidation for querying sub
Yadunund Jan 18, 2024
762668b
Set default consolidation for client query
Yadunund Jan 18, 2024
594bb97
Switch to non-blocking channel for liveliness_get
Yadunund Jan 18, 2024
804e915
Resolve merge conflicts after fixes to services
Yadunund Jan 19, 2024
02fee09
Make shm manager optional
Yadunund Jan 19, 2024
446aec3
Cleanup
Yadunund Jan 19, 2024
bf7727b
Merge branch 'rolling' into yadu/qos-durability
Yadunund Jan 23, 2024
e0a1de1
Merge branch 'rolling' into yadu/qos-durability
Yadunund Jan 23, 2024
5314de6
Explicitly ensure subscription depth is minimum 1
Yadunund Jan 23, 2024
a9b4245
Build zenoh-c with shm feature but do not enable it in the session co…
Yadunund Jan 24, 2024
cd02957
Always drop shmbuf
Yadunund Jan 24, 2024
34f2623
Add comment on why timestamping is enabled in zenoh configs
Yadunund Jan 24, 2024
52d16cb
Rely on strtol to get queue depth
Yadunund Jan 24, 2024
a664ac2
Resolve merge conflicts with rolling
Yadunund Jan 25, 2024
5c2b179
Merge branch 'rolling' into yadu/qos-durability
Yadunund Jan 26, 2024
2ad493d
Stick to blocking channel for liveliness get
Yadunund Jan 29, 2024
1298abd
Define default history depth
Yadunund Jan 31, 2024
66494e3
Update rmw_zenoh_cpp/src/rmw_zenoh.cpp
Yadunund Feb 7, 2024
a807343
Update rmw_zenoh_cpp/src/rmw_zenoh.cpp
Yadunund Feb 7, 2024
c807b34
Add example for liveliness keyexpr
Yadunund Feb 7, 2024
0098371
Simplify split_keyexpr
Yadunund Feb 7, 2024
e85d45d
Increase and justify default depth
Yadunund Feb 7, 2024
966a043
Update rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Yadunund Feb 7, 2024
d088360
Fix minor cpplint problems.
clalancette Feb 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(rmw_dds_common REQUIRED)
clalancette marked this conversation as resolved.
Show resolved Hide resolved
find_package(zenoh_c_vendor REQUIRED)
find_package(zenohc REQUIRED)

Expand Down Expand Up @@ -57,6 +58,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
rmw_dds_common::rmw_dds_common_library
zenohc::lib
)

Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
timestamping: {
/// Whether data messages should be timestamped if not already.
/// Accepts a single boolean value or different values for router, peer and client.
enabled: { router: true, peer: false, client: false },
enabled: { router: true, peer: true, client: false },
clalancette marked this conversation as resolved.
Show resolved Hide resolved
/// Whether data messages with timestamps in the future should be dropped or not.
/// If set to false (default), messages with timestamps in the future are retimestamped.
/// Timestamps are ignored if timestamping is disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
timestamping: {
/// Whether data messages should be timestamped if not already.
/// Accepts a single boolean value or different values for router, peer and client.
enabled: { router: true, peer: false, client: false },
enabled: { router: true, peer: true, client: false },
/// Whether data messages with timestamps in the future should be dropped or not.
/// If set to false (default), messages with timestamps in the future are retimestamped.
/// Timestamps are ignored if timestamping is disabled.
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>rmw_dds_common</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
56 changes: 53 additions & 3 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rmw/validate_node_name.h"

#include "graph_cache.hpp"
#include "rmw_data_types.hpp"

///=============================================================================
using Entity = liveliness::Entity;
Expand Down Expand Up @@ -679,6 +680,46 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
return fill_names_and_types(graph_topics_, allocator, topic_names_and_types);
}

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
// TODO(Yadunund): Check if QoS settings also match.
*subscription_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
if (topic_it != graph_topics_.end()) {
rmw_publisher_data_t * pub_data = static_cast<rmw_publisher_data_t *>(publisher->data);
GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find(
pub_data->type_support->get_name());
if (topic_data_it != topic_it->second.end()) {
*subscription_count = topic_data_it->second->stats_.sub_count_;
}
}

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
// TODO(Yadunund): Check if QoS settings also match.
*publisher_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name);
if (topic_it != graph_topics_.end()) {
rmw_subscription_data_t * sub_data = static_cast<rmw_subscription_data_t *>(subscription->data);
GraphNode::TopicDataMap::const_iterator topic_data_it = topic_it->second.find(
sub_data->type_support->get_name());
if (topic_data_it != topic_it->second.end()) {
*publisher_count = topic_data_it->second->stats_.pub_count_;
}
}

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::get_service_names_and_types(
rcutils_allocator_t * allocator,
Expand Down Expand Up @@ -893,7 +934,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
entity_type == EntityType::Publisher ? nodes[i]->pubs_ :
nodes[i]->subs_;
const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second;
for (const auto & [topic_data, _] : topic_data_map) {
for (const auto & [topic_type, topic_data] : topic_data_map) {
rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i];
endpoint_info = rmw_get_zero_initialized_topic_endpoint_info();

Expand All @@ -915,7 +956,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(

ret = rmw_topic_endpoint_info_set_topic_type(
&endpoint_info,
_demangle_if_ros_type(topic_data).c_str(),
_demangle_if_ros_type(topic_type).c_str(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
Expand All @@ -927,7 +968,16 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
if (RMW_RET_OK != ret) {
return ret;
}
// TODO(Yadunund): Set type_hash, qos_profile, gid.

ret = rmw_topic_endpoint_info_set_qos_profile(
&endpoint_info,
&topic_data->info_.qos_
);
if (RMW_RET_OK != ret) {
return ret;
}

// TODO(Yadunund): Set type_hash, gid.
}
}

Expand Down
8 changes: 8 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ class GraphCache final
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count);

rmw_ret_t get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const;
Expand Down
168 changes: 127 additions & 41 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ NodeInfo::NodeInfo(
TopicInfo::TopicInfo(
std::string name,
std::string type,
std::string qos)
rmw_qos_profile_t qos)
: name_(std::move(name)),
type_(std::move(type)),
qos_(std::move(qos))
Expand All @@ -67,6 +67,8 @@ static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char SLASH_REPLACEMENT = '%';
static const char QOS_DELIMITER = ':';
static const char QOS_HISTORY_DELIMITER = ',';

static const std::unordered_map<EntityType, std::string> entity_to_str = {
{EntityType::Node, NODE_STR},
Expand All @@ -84,18 +86,128 @@ static const std::unordered_map<std::string, EntityType> str_to_entity = {
{CLI_STR, EntityType::Client}
};

static const std::unordered_map<std::string, rmw_qos_history_policy_e> str_to_qos_history = {
{std::to_string(RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT), RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT},
{std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_LAST), RMW_QOS_POLICY_HISTORY_KEEP_LAST},
{std::to_string(RMW_QOS_POLICY_HISTORY_KEEP_ALL), RMW_QOS_POLICY_HISTORY_KEEP_ALL},
{std::to_string(RMW_QOS_POLICY_HISTORY_UNKNOWN), RMW_QOS_POLICY_HISTORY_UNKNOWN}
};

static const std::unordered_map<std::string,
rmw_qos_reliability_policy_e> str_to_qos_reliability = {
{std::to_string(RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT),
RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT},
{std::to_string(RMW_QOS_POLICY_RELIABILITY_RELIABLE), RMW_QOS_POLICY_RELIABILITY_RELIABLE},
{std::to_string(RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT), RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT},
{std::to_string(RMW_QOS_POLICY_RELIABILITY_UNKNOWN), RMW_QOS_POLICY_RELIABILITY_UNKNOWN}
};

static const std::unordered_map<std::string, rmw_qos_durability_policy_e> str_to_qos_durability = {
{std::to_string(RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT),
RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT},
{std::to_string(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL),
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL},
{std::to_string(RMW_QOS_POLICY_DURABILITY_VOLATILE), RMW_QOS_POLICY_DURABILITY_VOLATILE},
{std::to_string(RMW_QOS_POLICY_DURABILITY_UNKNOWN), RMW_QOS_POLICY_DURABILITY_UNKNOWN}
};

std::string zid_to_str(z_id_t id)
{
std::stringstream ss;
ss << std::hex;
size_t i = 0;
for (; i < (sizeof(id.id) - 1); i++) {
ss << static_cast<int>(id.id[i]) << ".";
for (; i < (sizeof(id.id)); i++) {
ss << static_cast<int>(id.id[i]);
clalancette marked this conversation as resolved.
Show resolved Hide resolved
}
ss << static_cast<int>(id.id[i]);
return ss.str();
}

std::vector<std::string> split_keyexpr(
const std::string & keyexpr,
const char delim = '/')
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) {
if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
}
std::vector<std::string> result = {};
try {
for (std::size_t i = 1; i < delim_idx.size(); ++i) {
const size_t prev_idx = delim_idx[i - 1];
const size_t idx = delim_idx[i];
result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1));
}
} catch (const std::exception & e) {
printf("%s\n", e.what());
return {};
}
// Finally add the last substr.
result.push_back(keyexpr.substr(delim_idx.back() + 1));
return result;
}
clalancette marked this conversation as resolved.
Show resolved Hide resolved

/**
* Convert a rmw_qos_profile_t to a string with format:
*
* <ReliabilityKind>:<DurabilityKind>:<HistoryKind>,<HistoryDepth>"
* Where:
* <ReliabilityKind> - enum value from rmw_qos_reliability_policy_e.
* <DurabilityKind> - enum value from rmw_qos_durability_policy_e.
* <HistoryKind> - enum value from rmw_qos_history_policy_e.
* <HistoryDepth> - The depth number.
*/
// TODO(Yadunund): Rely on maps to retrieve strings.
std::string qos_to_keyexpr(rmw_qos_profile_t qos)
clalancette marked this conversation as resolved.
Show resolved Hide resolved
{
std::string keyexpr = "";
keyexpr += std::to_string(qos.reliability);
keyexpr += QOS_DELIMITER;
keyexpr += std::to_string(qos.durability);
keyexpr += QOS_DELIMITER;
keyexpr += std::to_string(qos.history);
keyexpr += QOS_HISTORY_DELIMITER;
keyexpr += std::to_string(qos.depth);
return keyexpr;
}

/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid.
std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
{
rmw_qos_profile_t qos;
const std::vector<std::string> parts = split_keyexpr(keyexpr, QOS_DELIMITER);
if (parts.size() < 3) {
return std::nullopt;
}
const std::vector<std::string> history_parts = split_keyexpr(parts[2], QOS_HISTORY_DELIMITER);
if (history_parts.size() < 2) {
return std::nullopt;
}

try {
qos.history = str_to_qos_history.at(history_parts[0]);
qos.reliability = str_to_qos_reliability.at(parts[0]);
qos.durability = str_to_qos_durability.at(parts[1]);
} catch (const std::exception & e) {
return std::nullopt;
}
sscanf(history_parts[1].c_str(), "%zu", &qos.depth);
clalancette marked this conversation as resolved.
Show resolved Hide resolved

// Liveliness is always automatic given liveliness tokens.
qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC;
qos.liveliness_lease_duration = RMW_QOS_LIVELINESS_LEASE_DURATION_DEFAULT;

// TODO(Yadunund): Update once we support these settings.
qos.deadline = RMW_QOS_DEADLINE_DEFAULT;
qos.lifespan = RMW_QOS_LIFESPAN_DEFAULT;
return qos;
}

} // namespace

///=============================================================================
Expand Down Expand Up @@ -156,7 +268,8 @@ Entity::Entity(
const auto & topic_info = this->topic_info_.value();
// Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/".
token_ss <<
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_;
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr(
topic_info.qos_);
}

this->keyexpr_ = token_ss.str();
Expand Down Expand Up @@ -186,41 +299,6 @@ std::optional<Entity> Entity::make(
return entity;
}

namespace
{
///=============================================================================
std::vector<std::string> split_keyexpr(
const std::string & keyexpr,
const char delim = '/')
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) {
if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
}
std::vector<std::string> result = {};
try {
for (std::size_t i = 1; i < delim_idx.size(); ++i) {
const size_t prev_idx = delim_idx[i - 1];
const size_t idx = delim_idx[i];
result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1));
}
} catch (const std::exception & e) {
printf("%s\n", e.what());
return {};
}
// Finally add the last substr.
result.push_back(keyexpr.substr(delim_idx.back() + 1));
return result;
}

} // namespace

///=============================================================================
std::optional<Entity> Entity::make(const std::string & keyexpr)
{
Expand Down Expand Up @@ -276,10 +354,18 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
"Received liveliness token for non-node entity without required parameters.");
return std::nullopt;
}
std::optional<rmw_qos_profile_t> qos = keyexpr_to_qos(parts[8]);
if (!qos.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid qos keyexpr");
return std::nullopt;
}
topic_info = TopicInfo{
demangle_name(std::move(parts[6])),
std::move(parts[7]),
std::move(parts[8])};
std::move(qos.value())
};
}

return Entity{
Expand Down
5 changes: 3 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>

#include "rmw/types.h"

namespace liveliness
{
Expand All @@ -45,12 +46,12 @@ struct TopicInfo
{
std::string name_;
std::string type_;
std::string qos_;
rmw_qos_profile_t qos_;

TopicInfo(
std::string name,
std::string type,
std::string qos);
rmw_qos_profile_t qos);
};

///=============================================================================
Expand Down
Loading
Loading