Skip to content

Commit

Permalink
Update graph cache and introspection methods with qos
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Jan 11, 2024
1 parent b7e329f commit ba2e2a5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 57 deletions.
21 changes: 14 additions & 7 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ void GraphCache::parse_put(const std::string & keyexpr)

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Added %s on topic %s with type %s and qos %s to node /%s.",
"Added %s on topic %s with type %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

Expand Down Expand Up @@ -409,11 +408,10 @@ void GraphCache::parse_del(const std::string & keyexpr)

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
"Removed %s on topic %s with type %s and qos %s to node /%s.",
"Removed %s on topic %s with type %s to node /%s.",
entity_desc.c_str(),
topic_info.name_.c_str(),
topic_info.type_.c_str(),
topic_info.qos_.c_str(),
graph_node.name_.c_str());
};

Expand Down Expand Up @@ -921,7 +919,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
entity_type == EntityType::Publisher ? nodes[i]->pubs_ :
nodes[i]->subs_;
const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second;
for (const auto & [topic_data, _] : topic_data_map) {
for (const auto & [topic_type, topic_data] : topic_data_map) {
rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i];
endpoint_info = rmw_get_zero_initialized_topic_endpoint_info();

Expand All @@ -943,7 +941,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(

ret = rmw_topic_endpoint_info_set_topic_type(
&endpoint_info,
_demangle_if_ros_type(topic_data).c_str(),
_demangle_if_ros_type(topic_type).c_str(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
Expand All @@ -955,7 +953,16 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
if (RMW_RET_OK != ret) {
return ret;
}
// TODO(Yadunund): Set type_hash, qos_profile, gid.

ret = rmw_topic_endpoint_info_set_qos_profile(
&endpoint_info,
&topic_data->info_.qos_
);
if (RMW_RET_OK != ret) {
return ret;
}

// TODO(Yadunund): Set type_hash, gid.
}
}

Expand Down
135 changes: 97 additions & 38 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ NodeInfo::NodeInfo(
TopicInfo::TopicInfo(
std::string name,
std::string type,
std::string qos)
rmw_qos_profile_t qos)
: name_(std::move(name)),
type_(std::move(type)),
qos_(std::move(qos))
Expand All @@ -67,6 +67,7 @@ static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char SLASH_REPLACEMENT = '%';
static const char QOS_DELIMITER = ':';

static const std::unordered_map<EntityType, std::string> entity_to_str = {
{EntityType::Node, NODE_STR},
Expand All @@ -84,6 +85,12 @@ static const std::unordered_map<std::string, EntityType> str_to_entity = {
{CLI_STR, EntityType::Client}
};

// Map string literals to qos enums.
// static const std::unordered_map<std::string, uint8_t> str_to_qos_e = {
// {"reliable", RMW_QOS_POLICY_RELIABILITY_RELIABLE},
// {"best_effort", RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT}
// };

std::string zid_to_str(z_id_t id)
{
std::stringstream ss;
Expand All @@ -96,6 +103,84 @@ std::string zid_to_str(z_id_t id)
return ss.str();
}

std::vector<std::string> split_keyexpr(
const std::string & keyexpr,
const char delim = '/')
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) {
if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
}
std::vector<std::string> result = {};
try {
for (std::size_t i = 1; i < delim_idx.size(); ++i) {
const size_t prev_idx = delim_idx[i - 1];
const size_t idx = delim_idx[i];
result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1));
}
} catch (const std::exception & e) {
printf("%s\n", e.what());
return {};
}
// Finally add the last substr.
result.push_back(keyexpr.substr(delim_idx.back() + 1));
return result;
}

/**
* Convert a rmw_qos_profile_t to a string with format:
*
* <ReliabilityKind>:<DurabilityKind>:<HistoryKind>,<HistoryDepth>"
* Where:
* <ReliabilityKind> - "reliable" or "best_effort".
* <DurabilityKind> - "volatile" or "transient_local".
* <HistoryKind> - "keep_last".
* <HistoryDepth> - The depth number.
*/
// TODO(Yadunund): Rely on maps to retrieve strings.
std::string qos_to_keyexpr(rmw_qos_profile_t qos)
{
std::string keyexpr = "";
keyexpr += qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE ? "reliable" : "best_effort";
keyexpr += QOS_DELIMITER;
keyexpr += qos.durability ==
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" : "volatile";
keyexpr += QOS_DELIMITER;
// TODO(Yadunund): Update once we properly support History.
keyexpr += "keep_last,";
keyexpr += std::to_string(qos.depth);
return keyexpr;
}

/// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid.
std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr)
{
rmw_qos_profile_t qos;
const std::vector<std::string> parts = split_keyexpr(keyexpr, QOS_DELIMITER);
if (parts.size() < 3) {
return std::nullopt;
}
qos.reliability = parts[0] ==
"reliable" ? RMW_QOS_POLICY_RELIABILITY_RELIABLE : RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
qos.durability = parts[1] ==
"transient_local" ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL :
RMW_QOS_POLICY_DURABILITY_VOLATILE;
const std::vector<std::string> history_parts = split_keyexpr(parts[2], ',');
if (history_parts.size() < 2) {
return std::nullopt;
}
qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
sscanf(history_parts[1].c_str(), "%zu", &qos.depth);

return qos;
}

} // namespace

///=============================================================================
Expand Down Expand Up @@ -156,7 +241,8 @@ Entity::Entity(
const auto & topic_info = this->topic_info_.value();
// Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/".
token_ss <<
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_;
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr(
topic_info.qos_);
}

this->keyexpr_ = token_ss.str();
Expand Down Expand Up @@ -186,41 +272,6 @@ std::optional<Entity> Entity::make(
return entity;
}

namespace
{
///=============================================================================
std::vector<std::string> split_keyexpr(
const std::string & keyexpr,
const char delim = '/')
{
std::vector<std::size_t> delim_idx = {};
// Insert -1 for starting position to make the split easier when using substr.
delim_idx.push_back(-1);
std::size_t idx = 0;
for (std::string::const_iterator it = keyexpr.begin(); it != keyexpr.end(); ++it) {
if (*it == delim) {
delim_idx.push_back(idx);
}
++idx;
}
std::vector<std::string> result = {};
try {
for (std::size_t i = 1; i < delim_idx.size(); ++i) {
const size_t prev_idx = delim_idx[i - 1];
const size_t idx = delim_idx[i];
result.push_back(keyexpr.substr(prev_idx + 1, idx - prev_idx - 1));
}
} catch (const std::exception & e) {
printf("%s\n", e.what());
return {};
}
// Finally add the last substr.
result.push_back(keyexpr.substr(delim_idx.back() + 1));
return result;
}

} // namespace

///=============================================================================
std::optional<Entity> Entity::make(const std::string & keyexpr)
{
Expand Down Expand Up @@ -276,10 +327,18 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
"Received liveliness token for non-node entity without required parameters.");
return std::nullopt;
}
std::optional<rmw_qos_profile_t> qos = keyexpr_to_qos(parts[8]);
if (!qos.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid qos keyexpr");
return std::nullopt;
}
topic_info = TopicInfo{
demangle_name(std::move(parts[6])),
std::move(parts[7]),
std::move(parts[8])};
std::move(qos.value())
};
}

return Entity{
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <string>
#include <vector>

#include "rmw/types.h"


namespace liveliness
{
Expand All @@ -45,12 +47,12 @@ struct TopicInfo
{
std::string name_;
std::string type_;
std::string qos_;
rmw_qos_profile_t qos_;

TopicInfo(
std::string name,
std::string type,
std::string qos);
rmw_qos_profile_t qos);
};

///=============================================================================
Expand Down
20 changes: 10 additions & 10 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,12 @@ rmw_create_publisher(
}
}
// Adapt any 'best available' QoS options
// rmw_qos_profile_t adapted_qos_profile = *qos_profile;
// rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
// node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
// if (RMW_RET_OK != ret) {
// return nullptr;
// }
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = rmw_dds_common::qos_profile_get_best_available_for_topic_publisher(
node, topic_name, &adapted_qos_profile, rmw_get_subscriptions_info_by_topic);
if (RMW_RET_OK != ret) {
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);
if (publisher_options->require_unique_network_flow_endpoints ==
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED)
Expand Down Expand Up @@ -620,7 +620,7 @@ rmw_create_publisher(
liveliness::EntityType::Publisher,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_publisher->topic_name,
publisher_data->type_support->get_name(), "reliable"}
publisher_data->type_support->get_name(), adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -1334,7 +1334,7 @@ rmw_create_subscription(
liveliness::EntityType::Subscription,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_subscription->topic_name,
sub_data->type_support->get_name(), "reliable"}
sub_data->type_support->get_name(), adapted_qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -1905,7 +1905,7 @@ rmw_create_client(
liveliness::EntityType::Client,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_client->service_name,
std::move(service_type), "reliable"}
std::move(service_type), *qos_profile}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -2394,7 +2394,7 @@ rmw_create_service(
liveliness::EntityType::Service,
liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""},
liveliness::TopicInfo{rmw_service->service_name,
std::move(service_type), "reliable"}
std::move(service_type), *qos_profiles}
);
if (!liveliness_entity.has_value()) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down

0 comments on commit ba2e2a5

Please sign in to comment.