diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index a66af168..4fa7b445 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -40,6 +40,6 @@ jobs: with: package-name: | rmw_zenoh_cpp - zenoh_c_vendor + zenoh_cpp_vendor target-ros2-distro: ${{ matrix.ROS_DISTRO }} vcs-repo-file-url: ${{ matrix.BUILD_TYPE == 'source' && env.ROS2_REPOS_FILE_URL || '' }} diff --git a/README.md b/README.md index 164fe0f8..0359f92b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Build `rmw_zenoh_cpp` >Note: By default, we vendor and compile `zenoh-c` with a subset of `zenoh` features. The `ZENOHC_CARGO_FLAGS` CMake argument may be overwritten with other features included if required. -See [zenoh_c_vendor/CMakeLists.txt](./zenoh_c_vendor/CMakeLists.txt) for more details. +See [zenoh_cpp_vendor/CMakeLists.txt](./zenoh_cpp_vendor/CMakeLists.txt) for more details. ```bash mkdir ~/ws_rmw_zenoh/src -p && cd ~/ws_rmw_zenoh/src diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 89b5a598..185fa804 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -21,7 +21,7 @@ find_package(rcutils REQUIRED) find_package(rosidl_typesupport_fastrtps_c REQUIRED) find_package(rosidl_typesupport_fastrtps_cpp REQUIRED) find_package(rmw REQUIRED) -find_package(zenoh_c_vendor REQUIRED) +find_package(zenoh_cpp_vendor REQUIRED) add_library(rmw_zenoh_cpp SHARED src/detail/attachment_helpers.cpp @@ -45,7 +45,6 @@ add_library(rmw_zenoh_cpp SHARED src/detail/type_support.cpp src/detail/type_support_common.cpp src/detail/zenoh_config.cpp - src/detail/zenoh_router_check.cpp src/detail/zenoh_utils.cpp src/rmw_event.cpp src/rmw_get_network_flow_endpoints.cpp @@ -68,7 +67,7 @@ target_link_libraries(rmw_zenoh_cpp rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp rmw::rmw - zenohc::lib + zenohcxx::zenohc ) configure_rmw_library(rmw_zenoh_cpp) @@ -130,7 +129,7 @@ target_link_libraries(rmw_zenohd rcutils::rcutils rcpputils::rcpputils rmw::rmw - zenohc::lib + zenohcxx::zenohc ) install( diff --git a/rmw_zenoh_cpp/package.xml b/rmw_zenoh_cpp/package.xml index e1d2a24a..79272665 100644 --- a/rmw_zenoh_cpp/package.xml +++ b/rmw_zenoh_cpp/package.xml @@ -15,8 +15,8 @@ ament_cmake - zenoh_c_vendor - zenoh_c_vendor + zenoh_cpp_vendor + zenoh_cpp_vendor ament_index_cpp fastcdr diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index da2571ce..37c7f335 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include #include +#include #include #include +#include + +#include #include "rmw/types.h" @@ -27,75 +29,24 @@ namespace rmw_zenoh_cpp { -///============================================================================= + AttachmentData::AttachmentData( - const int64_t sequence_number, - const int64_t source_timestamp, - const uint8_t source_gid[RMW_GID_STORAGE_SIZE]) -: sequence_number_(sequence_number), - source_timestamp_(source_timestamp) + const int64_t _sequence_number, + const int64_t _source_timestamp, + const std::vector _source_gid) { - memcpy(source_gid_, source_gid, RMW_GID_STORAGE_SIZE); + sequence_number_ = _sequence_number; + source_timestamp_ = _source_timestamp; + source_gid_ = _source_gid; gid_hash_ = hash_gid(source_gid_); } -///============================================================================= AttachmentData::AttachmentData(AttachmentData && data) -: sequence_number_(std::move(data.sequence_number_)), - source_timestamp_(std::move(data.source_timestamp_)), - gid_hash_(std::move(data.gid_hash_)) -{ - memcpy(source_gid_, data.source_gid_, RMW_GID_STORAGE_SIZE); -} - -///============================================================================= -AttachmentData::AttachmentData(const z_loaned_bytes_t * attachment) { - ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); - z_owned_string_t key; - - // Deserialize the sequence_number - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view( - z_string_data(z_loan(key)), - z_string_len(z_loan(key))) != "sequence_number") - { - throw std::runtime_error("sequence_number is not found in the attachment."); - } - z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number_)) { - throw std::runtime_error("Failed to deserialize the sequence_number."); - } - - // Deserialize the source_timestamp - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view( - z_string_data(z_loan(key)), - z_string_len(z_loan(key))) != "source_timestamp") - { - throw std::runtime_error("source_timestamp is not found in the attachment"); - } - z_drop(z_move(key)); - if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp_)) { - throw std::runtime_error("Failed to deserialize the source_timestamp."); - } - - // Deserialize the source_gid - ze_deserializer_deserialize_string(&deserializer, &key); - if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") { - throw std::runtime_error("Invalid attachment: the key source_gid is not found"); - } - z_drop(z_move(key)); - z_owned_slice_t slice; - if (ze_deserializer_deserialize_slice(&deserializer, &slice)) { - throw std::runtime_error("Failed to deserialize the source_gid."); - } - if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) { - throw std::runtime_error("The length of source_gid mismatched."); - } - memcpy(this->source_gid_, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); - z_drop(z_move(slice)); - gid_hash_ = hash_gid(this->source_gid_); + gid_hash_ = std::move(data.gid_hash_); + sequence_number_ = std::move(data.sequence_number_); + source_timestamp_ = std::move(data.source_timestamp_); + source_gid_ = data.source_gid_; } ///============================================================================= @@ -111,9 +62,9 @@ int64_t AttachmentData::source_timestamp() const } ///============================================================================= -void AttachmentData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::vector AttachmentData::copy_gid() const { - memcpy(out_gid, source_gid_, RMW_GID_STORAGE_SIZE); + return source_gid_; } ///============================================================================= @@ -122,17 +73,38 @@ size_t AttachmentData::gid_hash() const return gid_hash_; } -///============================================================================= -void AttachmentData::serialize_to_zbytes(z_owned_bytes_t * attachment) +zenoh::Bytes AttachmentData::serialize_to_zbytes() +{ + auto serializer = zenoh::ext::Serializer(); + serializer.serialize(std::string("sequence_number")); + serializer.serialize(this->sequence_number_); + serializer.serialize(std::string("source_timestamp")); + serializer.serialize(this->source_timestamp_); + serializer.serialize(std::string("source_gid")); + serializer.serialize(this->source_gid_); + return std::move(serializer).finish(); +} + +AttachmentData::AttachmentData(const zenoh::Bytes & attachment) { - ze_owned_serializer_t serializer; - ze_serializer_empty(&serializer); - ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number_); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); - ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp_); - ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); - ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid_, RMW_GID_STORAGE_SIZE); - ze_serializer_finish(z_move(serializer), attachment); + zenoh::ext::Deserializer deserializer(std::move(attachment)); + const auto sequence_number_str = deserializer.deserialize(); + if (sequence_number_str != "sequence_number") { + throw std::runtime_error("sequence_number is not found in the attachment."); + } + this->sequence_number_ = deserializer.deserialize(); + + const auto source_timestamp_str = deserializer.deserialize(); + if (source_timestamp_str != "source_timestamp") { + throw std::runtime_error("source_timestamp is not found in the attachment."); + } + this->source_timestamp_ = deserializer.deserialize(); + + const auto source_gid_str = deserializer.deserialize(); + if (source_gid_str != "source_gid") { + throw std::runtime_error("source_gid is not found in the attachment."); + } + this->source_gid_ = deserializer.deserialize>(); + gid_hash_ = hash_gid(this->source_gid_); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index ec460620..7ae57709 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -15,34 +15,37 @@ #ifndef DETAIL__ATTACHMENT_HELPERS_HPP_ #define DETAIL__ATTACHMENT_HELPERS_HPP_ -#include +#include +#include + +#include #include "rmw/types.h" namespace rmw_zenoh_cpp { -///============================================================================= class AttachmentData final { public: - AttachmentData( - const int64_t sequence_number, - const int64_t source_timestamp, - const uint8_t source_gid[RMW_GID_STORAGE_SIZE]); - explicit AttachmentData(const z_loaned_bytes_t *); + explicit AttachmentData( + const int64_t _sequence_number, + const int64_t _source_timestamp, + const std::vector _source_gid); + + explicit AttachmentData(const zenoh::Bytes & bytes); explicit AttachmentData(AttachmentData && data); int64_t sequence_number() const; int64_t source_timestamp() const; - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::vector copy_gid() const; size_t gid_hash() const; - void serialize_to_zbytes(z_owned_bytes_t *); + zenoh::Bytes serialize_to_zbytes(); private: int64_t sequence_number_; int64_t source_timestamp_; - uint8_t source_gid_[RMW_GID_STORAGE_SIZE]; + std::vector source_gid_; size_t gid_hash_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b622e06d..e0795ac8 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -69,8 +69,8 @@ TopicData::TopicData(ConstEntityPtr entity) } ///============================================================================= -GraphCache::GraphCache(const z_id_t & zid) -: zid_str_(liveliness::zid_to_str(zid)) +GraphCache::GraphCache(const std::string & zid) +: zid_str_(zid) { // Do nothing. } @@ -1160,7 +1160,7 @@ rmw_ret_t GraphCache::get_entities_info_by_topic( } memset(ep.endpoint_gid, 0, RMW_GID_STORAGE_SIZE); - entity->copy_gid(ep.endpoint_gid); + memcpy(ep.endpoint_gid, entity->copy_gid().data(), RMW_GID_STORAGE_SIZE); endpoints.push_back(ep); } diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 8ac2172d..e9eecb5d 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -110,7 +110,7 @@ class GraphCache final /// @param id The id of the zenoh session that is building the graph cache. /// This is used to infer which entities originated from the current session /// so that appropriate event callbacks may be triggered. - explicit GraphCache(const z_id_t & zid); + explicit GraphCache(const std::string & zid); // Parse a PUT message over a token's key-expression and update the graph. void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false); diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 52c8ec7f..0d22d66f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include "logging_macros.hpp" #include "qos.hpp" #include "simplified_xxhash3.hpp" @@ -371,16 +373,6 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) return qos; } -///============================================================================= -std::string zid_to_str(const z_id_t & id) -{ - z_owned_string_t z_str; - z_id_to_string(&id, &z_str); - std::string str(z_string_data(z_loan(z_str)), z_string_len(z_loan(z_str))); - z_drop(z_move(z_str)); - return str; -} - ///============================================================================= std::string subscription_token(size_t domain_id) { @@ -445,8 +437,10 @@ Entity::Entity( // returned to the RMW layer as necessary. simplified_XXH128_hash_t keyexpr_gid = simplified_XXH3_128bits(this->liveliness_keyexpr_.c_str(), this->liveliness_keyexpr_.length()); - memcpy(this->gid_, &keyexpr_gid.low64, sizeof(keyexpr_gid.low64)); - memcpy(this->gid_ + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, sizeof(keyexpr_gid.high64)); + this->gid_.resize(RMW_GID_STORAGE_SIZE); + memcpy(this->gid_.data(), &keyexpr_gid.low64, sizeof(keyexpr_gid.low64)); + memcpy(this->gid_.data() + sizeof(keyexpr_gid.low64), &keyexpr_gid.high64, + sizeof(keyexpr_gid.high64)); // We also hash the liveliness keyexpression into a size_t that we use to index into our maps. this->keyexpr_hash_ = hash_gid(this->gid_); @@ -454,7 +448,7 @@ Entity::Entity( ///============================================================================= std::shared_ptr Entity::make( - z_id_t zid, + zenoh::Id zid, const std::string & nid, const std::string & id, EntityType type, @@ -480,7 +474,7 @@ std::shared_ptr Entity::make( return std::make_shared( Entity{ - zid_to_str(zid), + std::string(zid.to_string()), nid, id, std::move(type), @@ -637,10 +631,15 @@ std::string Entity::liveliness_keyexpr() const return this->liveliness_keyexpr_; } -///============================================================================= void Entity::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const { - memcpy(out_gid, gid_, RMW_GID_STORAGE_SIZE); + memcpy(out_gid, gid_.data(), RMW_GID_STORAGE_SIZE); +} + +///============================================================================= +std::vector Entity::copy_gid() const +{ + return gid_; } ///============================================================================= @@ -679,7 +678,18 @@ std::string demangle_name(const std::string & input) } // namespace liveliness ///============================================================================= -size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]) +size_t hash_gid(const std::vector gid) +{ + std::stringstream hash_str; + hash_str << std::hex; + for (const auto & g : gid) { + hash_str << static_cast(g); + } + return std::hash{}(hash_str.str()); +} + +///============================================================================= +size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE]) { std::stringstream hash_str; hash_str << std::hex; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 895ae604..1096b45b 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__LIVELINESS_UTILS_HPP_ #define DETAIL__LIVELINESS_UTILS_HPP_ -#include - #include #include #include @@ -25,6 +23,8 @@ #include #include +#include + #include "rmw/types.h" namespace rmw_zenoh_cpp @@ -127,7 +127,7 @@ class Entity /// @param topic_info An optional topic information for relevant entities. /// @return An entity if all inputs are valid. This way no invalid entities can be created. static EntityPtr make( - z_id_t zid, + zenoh::Id zid, const std::string & nid, const std::string & id, EntityType type, @@ -174,6 +174,8 @@ class Entity void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::vector copy_gid() const; + private: Entity( std::string zid, @@ -191,7 +193,7 @@ class Entity NodeInfo node_info_; std::optional topic_info_; std::string liveliness_keyexpr_; - uint8_t gid_[RMW_GID_STORAGE_SIZE]; + std::vector gid_; }; ///============================================================================= @@ -231,15 +233,13 @@ std::string qos_to_keyexpr(const rmw_qos_profile_t & qos); ///============================================================================= /// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. std::optional keyexpr_to_qos(const std::string & keyexpr); - -///============================================================================= -/// Convert a Zenoh id to a string. -std::string zid_to_str(const z_id_t & id); } // namespace liveliness ///============================================================================= /// Generate a hash for a given GID. -size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]); +size_t hash_gid_p(const uint8_t gid[RMW_GID_STORAGE_SIZE]); + +size_t hash_gid(const std::vector gid); } // namespace rmw_zenoh_cpp ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp index a95c98a2..ea2847f6 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.cpp @@ -25,6 +25,9 @@ #include #include #include +#include + +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -40,72 +43,11 @@ #include "rmw/get_topic_endpoint_info.h" #include "rmw/impl/cpp/macros.hpp" -namespace -{ - -///============================================================================= -void client_data_handler(z_loaned_reply_t * reply, void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t from data in client_data_handler." - ); - return; - } - - if (client_data->is_shutdown()) { - return; - } - - if (!z_reply_is_ok(reply)) { - const z_loaned_reply_err_t * err = z_reply_err(reply); - const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); - - z_owned_string_t err_str; - z_bytes_to_string(err_payload, &err_str); - - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", - client_data->topic_info().topic_keyexpr_.c_str(), - static_cast(z_string_len(z_loan(err_str))), - z_string_data(z_loan(err_str))); - z_drop(z_move(err_str)); - - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - client_data->add_new_reply( - std::make_unique(reply, received_timestamp)); -} - -///============================================================================= -void client_data_drop(void * data) -{ - auto client_data = static_cast(data); - if (client_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain client_data_t from data in client_data_drop." - ); - return; - } - - client_data->decrement_in_flight_and_conditionally_remove(); -} - -} // namespace - namespace rmw_zenoh_cpp { ///============================================================================= std::shared_ptr ClientData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -167,7 +109,7 @@ std::shared_ptr ClientData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Client, @@ -232,45 +174,27 @@ ClientData::ClientData( } ///============================================================================= -bool ClientData::init(const z_loaned_session_t * session) +bool ClientData::init(const std::shared_ptr session) { - if (z_keyexpr_from_str( - &this->keyexpr_, - this->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) - { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return false; - } - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->keyexpr_)); - }); - + std::string topic_keyexpr = this->entity_->topic_info().value().topic_keyexpr_; + keyexpr_ = zenoh::KeyExpr(topic_keyexpr); + // TODO(ahcorde) check KeyExpr std::string liveliness_keyexpr = this->entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - if (z_liveliness_declare_token( - session, - &this->token_, - z_loan(liveliness_ke), - NULL - ) != Z_OK) - { + zenoh::ZResult err; + this->token_ = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the client."); return false; } - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(this->token_)); - }); + sess_ = session; initialized_ = true; - free_ros_keyexpr.cancel(); - free_token.cancel(); - return true; } @@ -292,10 +216,9 @@ bool ClientData::liveliness_is_valid() const } ///============================================================================= -void ClientData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const +std::vector ClientData::copy_gid() const { - std::lock_guard lock(mutex_); - entity_->copy_gid(out_gid); + return entity_->copy_gid(); } ///============================================================================= @@ -308,15 +231,14 @@ void ClientData::add_new_reply(std::unique_ptr reply) reply_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); + std::string keystr = std::string(keyexpr_.value().as_string_view()); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " "for client for %.*s", adapted_qos_profile.depth, - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr))); + keystr.size(), + keystr.c_str()); reply_queue_.pop_front(); } reply_queue_.emplace_back(std::move(reply)); @@ -346,47 +268,67 @@ rmw_ret_t ClientData::take_response( std::unique_ptr latest_reply = std::move(reply_queue_.front()); reply_queue_.pop_front(); - if (!latest_reply->get_sample().has_value()) { + auto & reply = latest_reply->get_sample(); + + if (!reply.is_ok()) { RMW_SET_ERROR_MSG("invalid reply sample"); return RMW_RET_ERROR; } - const z_loaned_sample_t * sample = latest_reply->get_sample().value(); + + const zenoh::Sample & sample = reply.get_ok(); // Object that manages the raw buffer - z_owned_slice_t payload; - z_bytes_to_slice(z_sample_payload(sample), &payload); - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), - z_slice_len(z_loan(payload))); + auto & payload = sample.get_payload(); + auto slice = payload.slice_iter().next(); + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; + + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload)), payload_len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!response_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_response, + response_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS response"); + return RMW_RET_ERROR; + } - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!response_type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_response, - response_type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS response"); - return RMW_RET_ERROR; - } + // Fill in the request_header + if (!sample.get_attachment().has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData take_request attachment is empty"); + return RMW_RET_ERROR; + } + rmw_zenoh_cpp::AttachmentData attachment(std::move(sample.get_attachment().value().get())); + request_header->request_id.sequence_number = attachment.sequence_number(); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); + return RMW_RET_ERROR; + } + request_header->source_timestamp = attachment.source_timestamp(); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + memcpy( + request_header->request_id.writer_guid, + attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); + request_header->received_timestamp = latest_reply->get_received_timestamp(); - // Fill in the request_header - AttachmentData attachment(z_sample_attachment(sample)); - request_header->request_id.sequence_number = attachment.sequence_number(); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } - request_header->source_timestamp = attachment.source_timestamp(); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + *taken = true; + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ClientData not able to get slice data"); return RMW_RET_ERROR; } - attachment.copy_gid(request_header->request_id.writer_guid); - request_header->received_timestamp = latest_reply->get_received_timestamp(); - - z_drop(z_move(payload)); - *taken = true; return RMW_RET_OK; } @@ -439,16 +381,10 @@ rmw_ret_t ClientData::send_request( *sequence_id = sequence_number_++; // Send request - z_get_options_t opts; - z_get_options_default(&opts); - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - create_map_and_set_sequence_num( - &attachment, *sequence_id, - local_gid); - opts.attachment = z_move(attachment); - + zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); + std::vector local_gid; + local_gid = entity_->copy_gid(); + opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; // The default timeout for a z_get query is 10 seconds and if a response is not received within // this window, the queryable will return an invalid reply. However, it is common for actions, @@ -458,23 +394,64 @@ rmw_ret_t ClientData::send_request( // Latest consolidation guarantees unicity of replies for the same key expression, // which optimizes bandwidth. The default is "None", which imples replies may come in any order // and any number. - opts.consolidation = z_query_consolidation_latest(); - z_owned_bytes_t payload; - z_bytes_copy_from_buf( - &payload, reinterpret_cast(request_bytes), data_length); - opts.payload = z_move(payload); + opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; + + std::vector raw_bytes( + reinterpret_cast(request_bytes), + reinterpret_cast(request_bytes) + data_length); + opts.payload = zenoh::Bytes(raw_bytes); // TODO(Yadunund): Once we switch to zenoh-cpp with lambda closures, // capture shared_from_this() instead of this. num_in_flight_++; - z_owned_closure_reply_t zn_closure_reply; - z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this); - z_get( - context_impl->session(), - z_loan(keyexpr_), "", - z_move(zn_closure_reply), - &opts); + std::weak_ptr client_data = shared_from_this(); + zenoh::ZResult err; + std::string parameters; + context_impl->session()->get( + keyexpr_.value(), + parameters, + [client_data](const zenoh::Reply & reply) { + if (!reply.is_ok()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "z_reply_is_ok returned False Reason: %s", + reply.get_err().get_payload().as_string()) + return; + } + const zenoh::Sample & sample = reply.get_ok(); + + auto sub_data = client_data.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ClientData from data for %s.", + std::string(sample.get_keyexpr().as_string_view())); + return; + } + + if (sub_data->is_shutdown()) { + return; + } + + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + sub_data->add_new_reply( + std::make_unique(reply, received_timestamp)); + }, + [client_data]() { + auto sub_data = client_data.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ClientData"); + return; + } + sub_data->decrement_in_flight_and_conditionally_remove(); + }, + std::move(opts), + &err); return RMW_RET_OK; } @@ -530,11 +507,16 @@ void ClientData::_shutdown() } // Unregister this node from the ROS graph. - if (initialized_) { - z_liveliness_undeclare_token(z_move(token_)); - z_drop(z_move(keyexpr_)); + zenoh::ZResult err; + std::move(token_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return; } + sess_.reset(); is_shutdown_ = true; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp index bc737435..a59cb3f0 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_client_data.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__RMW_CLIENT_DATA_HPP_ #define DETAIL__RMW_CLIENT_DATA_HPP_ -#include - #include #include #include @@ -25,6 +23,9 @@ #include #include #include +#include + +#include #include "event.hpp" #include "liveliness_utils.hpp" @@ -47,7 +48,7 @@ class ClientData final : public std::enable_shared_from_this public: // Make a shared_ptr of ClientData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, const rmw_client_t * client, liveliness::NodeInfo node_info, @@ -64,7 +65,7 @@ class ClientData final : public std::enable_shared_from_this bool liveliness_is_valid() const; // Copy the GID of this ClientData into an rmw_gid_t. - void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::vector copy_gid() const; // Add a new ZenohReply to the queue. void add_new_reply(std::unique_ptr reply); @@ -119,7 +120,7 @@ class ClientData final : public std::enable_shared_from_this std::shared_ptr response_type_support); // Initialize the Zenoh objects for this entity. - bool init(const z_loaned_session_t * session); + bool init(const std::shared_ptr session); // Shutdown this client (the mutex is expected to be held by the caller). void _shutdown(); @@ -131,10 +132,12 @@ class ClientData final : public std::enable_shared_from_this const rmw_client_t * rmw_client_; // The Entity generated for the service. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned keyexpression. - z_owned_keyexpr_t keyexpr_; + std::optional keyexpr_; // Liveliness token for the service. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 9458eb45..6249bd8f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -26,13 +26,14 @@ #include #include +#include + #include "graph_cache.hpp" #include "guard_condition.hpp" #include "identifier.hpp" #include "logging_macros.hpp" #include "rmw_node_data.hpp" #include "zenoh_config.hpp" -#include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" @@ -48,8 +49,6 @@ static std::mutex data_to_data_shared_ptr_map_mutex; static std::unordered_map> data_to_data_shared_ptr_map; -static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data); - // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing // "this" ptr by reference. @@ -67,48 +66,57 @@ class rmw_context_impl_s::Data final nodes_({}) { // Initialize the zenoh configuration. - z_owned_config_t config; - rmw_ret_t ret; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) - { + std::optional config = rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session); + + if (!config.has_value()) { throw std::runtime_error("Error configuring Zenoh session."); } + zenoh::ZResult result; + // Check if shm is enabled. - z_owned_string_t shm_enabled; - zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); + std::string shm_enabled = config.value().get(Z_CONFIG_SHARED_MEMORY_KEY, &result); + if (result != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Not able to get %s from the config file", + Z_CONFIG_SHARED_MEMORY_KEY); + } // Initialize the zenoh session. - if (z_open(&session_, z_move(config), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error setting up zenoh session."); - throw std::runtime_error("Error setting up zenoh session."); + session_ = std::make_shared( + std::move(config.value()), + zenoh::Session::SessionOptions::create_default(), + &result); + if (result != Z_OK) { + throw std::runtime_error("Error setting up zenoh session. "); } - auto close_session = rcpputils::make_scope_exit( - [this]() { - z_close(z_loan_mut(session_), NULL); - }); + + rmw_ret_t ret; // Verify if the zenoh router is running if configured. const std::optional configured_connection_attempts = rmw_zenoh_cpp::zenoh_router_check_attempts(); if (configured_connection_attempts.has_value()) { uint64_t connection_attempts = 0; + // Retry until the connection is successful. constexpr std::chrono::milliseconds sleep_time(1000); constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time); - while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) { + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { if ((connection_attempts % ticks_between_print) == 0) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "Unable to connect to a Zenoh router. " "Have you started a router with `ros2 run rmw_zenoh_cpp rmw_zenohd`?"); } + zenoh::ZResult err; + this->session_->get_routers_z_id(&err); + if (err != Z_OK) { + ++connection_attempts; + } else { + ret = RMW_RET_OK; + } if (++connection_attempts >= configured_connection_attempts.value()) { break; } @@ -117,8 +125,8 @@ class rmw_context_impl_s::Data final } // Initialize the graph cache. - const z_id_t zid = z_info_zid(z_loan(session_)); - graph_cache_ = std::make_shared(zid); + graph_cache_ = + std::make_shared(this->session_->get_zid().to_string()); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id); @@ -137,58 +145,51 @@ class rmw_context_impl_s::Data final // the code will be simpler, and if we're just going to spin over the non-blocking // reads until we obtain responses, we'll just be hogging CPU time by convincing // the OS that we're doing actual work when it could instead park the thread. - z_owned_fifo_handler_reply_t handler; - z_owned_closure_reply_t closure; - z_fifo_channel_reply_new(&closure, &handler, SIZE_MAX - 1); - - z_view_keyexpr_t keyexpr; - z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str()); - z_liveliness_get( - z_loan(session_), z_loan(keyexpr), - z_move(closure), NULL); - z_owned_reply_t reply; - while (z_recv(z_loan(handler), &reply) == Z_OK) { - if (z_reply_is_ok(z_loan(reply))) { - const z_loaned_sample_t * sample = z_reply_ok(z_loan(reply)); - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache_->parse_put(std::move(liveliness_str), true); + + // Intuitively use a FIFO channel rather than building it up with a closure and a + // handler to FIFO channel + zenoh::KeyExpr keyexpr(liveliness_str); + + zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default(); + options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL; + options.payload = ""; + + zenoh::ZResult err; + auto replies = session_->liveliness_get( + keyexpr, + zenoh::channels::FifoChannel(SIZE_MAX - 1), + zenoh::Session::LivelinessGetOptions::create_default(), + &err); + + if (err != Z_OK) { + throw std::runtime_error("Error getting liveliness. "); + } + + for (auto res = replies.recv(); std::holds_alternative(res); + res = replies.recv()) + { + const zenoh::Reply & reply = std::get(res); + if (reply.is_ok()) { + const auto & sample = reply.get_ok(); + graph_cache_->parse_put(std::string(sample.get_keyexpr().as_string_view()), true); } else { RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); } - z_drop(z_move(reply)); } - z_drop(z_move(handler)); // Initialize the shm manager if shared_memory is enabled in the config. shm_provider_ = std::nullopt; - if (strncmp( - z_string_data(z_loan(shm_enabled)), - "true", - z_string_len(z_loan(shm_enabled))) == 0) - { - // TODO(yuyuan): determine the default alignment of SHM - z_alloc_alignment_t alignment = {5}; - z_owned_memory_layout_t layout; - z_memory_layout_new(&layout, SHM_BUFFER_SIZE_MB * 1024 * 1024, alignment); - - z_owned_shm_provider_t provider; - if (z_posix_shm_provider_new(&provider, z_loan(layout)) != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "Unable to create an SHM provider."); - throw std::runtime_error("Unable to create an SHM provider."); + if (shm_enabled == "true") { + auto layout = zenoh::MemoryLayout( + SHM_BUFFER_SIZE_MB * 1024 * 1024, + zenoh::AllocAlignment({5})); + zenoh::PosixShmProvider provider(layout, &result); + if (result != Z_OK) { + throw std::runtime_error("Unable to create shm provider."); } - shm_provider_ = provider; + shm_provider_ = std::move(provider); } - auto free_shm_provider = rcpputils::make_scope_exit( - [this]() { - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); - } - }); graph_guard_condition_ = std::make_unique(); graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; @@ -196,28 +197,37 @@ class rmw_context_impl_s::Data final // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. - z_liveliness_subscriber_options_t sub_options; - z_liveliness_subscriber_options_default(&sub_options); - z_owned_closure_sample_t callback; - z_closure(&callback, graph_sub_data_handler, nullptr, this); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str()); - if (z_liveliness_declare_subscriber( - z_loan(session_), - &graph_subscriber_, z_loan(liveliness_ke), - z_move(callback), &sub_options) != Z_OK) - { + zenoh::KeyExpr keyexpr_cpp(liveliness_str.c_str()); + zenoh::Session::LivelinessSubscriberOptions sub_options = + zenoh::Session::LivelinessSubscriberOptions::create_default(); + sub_options.history = true; + graph_subscriber_cpp_ = session_->liveliness_declare_subscriber( + keyexpr_cpp, + [&](const zenoh::Sample & sample) { + // Look up the data shared_ptr in the global map. If it is in there, use it. + // If not, it is being shutdown so we can just ignore this update. + std::shared_ptr data_shared_ptr{nullptr}; + { + std::lock_guard lk(data_to_data_shared_ptr_map_mutex); + if (data_to_data_shared_ptr_map.count(this) == 0) { + return; + } + data_shared_ptr = data_to_data_shared_ptr_map[this]; + } + + // Update the graph cache. + data_shared_ptr->update_graph_cache( + sample, + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &err); + + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); throw std::runtime_error("Unable to subscribe to ROS graph updates."); } - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_undeclare_subscriber(z_move(this->graph_subscriber_)); - }); - - close_session.cancel(); - free_shm_provider.cancel(); - undeclare_z_sub.cancel(); } // Shutdown the Zenoh session. @@ -230,22 +240,34 @@ class rmw_context_impl_s::Data final return ret; } - z_undeclare_subscriber(z_move(graph_subscriber_)); - if (shm_provider_.has_value()) { - z_drop(z_move(shm_provider_.value())); + // TODO(ahcorde): review this + // // Shutdown all the nodes in this context. + // for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) { + // ret = node_it->second->shutdown(); + // if (ret != RMW_RET_OK) { + // RMW_ZENOH_LOG_ERROR_NAMED( + // "rmw_zenoh_cpp", + // "Unable to shutdown node with id %zu. rmw_ret_t code: %zu.", + // node_it->second->id(), + // ret + // ); + // } + + zenoh::ZResult err; + std::move(graph_subscriber_cpp_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; } + + session_.reset(); is_shutdown_ = true; // We specifically do *not* hold the mutex_ while tearing down the session; this allows us // to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler(). } - - // Close the zenoh session - if (z_close(z_loan_mut(session_), NULL) != Z_OK) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - return RMW_RET_OK; } @@ -255,13 +277,13 @@ class rmw_context_impl_s::Data final return enclave_; } - const z_loaned_session_t * session() const + const std::shared_ptr session() const { std::lock_guard lock(mutex_); - return z_loan(session_); + return session_; } - std::optional & shm_provider() + std::optional & shm_provider() { std::lock_guard lock(mutex_); return shm_provider_; @@ -288,7 +310,7 @@ class rmw_context_impl_s::Data final bool session_is_valid() const { std::lock_guard lock(mutex_); - return !z_session_is_closed(z_loan(session_)); + return !session_->is_closed(); } std::shared_ptr graph_cache() @@ -309,7 +331,7 @@ class rmw_context_impl_s::Data final } // Check that the Zenoh session is still valid. - if (z_session_is_closed(z_loan(session_))) { + if (!session_is_valid()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create NodeData as Zenoh session is invalid."); @@ -319,7 +341,7 @@ class rmw_context_impl_s::Data final auto node_data = rmw_zenoh_cpp::NodeData::make( node, this->get_next_entity_id(), - z_loan(session_), + session(), domain_id_, ns, node_name, @@ -353,17 +375,17 @@ class rmw_context_impl_s::Data final nodes_.erase(node); } - void update_graph_cache(z_sample_kind_t sample_kind, const std::string & keystr) + void update_graph_cache(const zenoh::Sample & sample_kind, const std::string & keystr) { std::lock_guard lock(mutex_); if (is_shutdown_) { return; } - switch (sample_kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + switch (sample_kind.get_kind()) { + case zenoh::SampleKind::Z_SAMPLE_KIND_PUT: graph_cache_->parse_put(keystr); break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + case zenoh::SampleKind::Z_SAMPLE_KIND_DELETE: graph_cache_->parse_del(keystr); break; default: @@ -375,7 +397,7 @@ class rmw_context_impl_s::Data final if (RMW_RET_OK != rmw_ret) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition." + "[update_graph_cache] Unable to trigger graph guard condition." ); } } @@ -396,14 +418,14 @@ class rmw_context_impl_s::Data final // Enclave, name used to find security artifacts in a sros2 keystore. std::string enclave_; // An owned session. - z_owned_session_t session_; + std::shared_ptr session_; // An optional SHM manager that is initialized of SHM is enabled in the // zenoh session config. - std::optional shm_provider_; + std::optional shm_provider_; // Graph cache. std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. - z_owned_subscriber_t graph_subscriber_; + std::optional> graph_subscriber_cpp_; // Equivalent to rmw_dds_common::Context's guard condition. // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_; @@ -417,37 +439,6 @@ class rmw_context_impl_s::Data final std::unordered_map> nodes_; }; -///============================================================================= -static void graph_sub_data_handler(z_loaned_sample_t * sample, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - - auto data_ptr = static_cast(data); - if (data_ptr == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Invalid data_ptr." - ); - return; - } - - // Look up the data shared_ptr in the global map. If it is in there, use it. - // If not, it is being shutdown so we can just ignore this update. - std::shared_ptr data_shared_ptr{nullptr}; - { - std::lock_guard lk(data_to_data_shared_ptr_map_mutex); - if (data_to_data_shared_ptr_map.count(data_ptr) == 0) { - return; - } - data_shared_ptr = data_to_data_shared_ptr_map[data_ptr]; - } - - // Update the graph cache. - std::string liveliness_str(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - data_shared_ptr->update_graph_cache(z_sample_kind(sample), std::move(liveliness_str)); -} - ///============================================================================= rmw_context_impl_s::rmw_context_impl_s( const std::size_t domain_id, @@ -472,13 +463,13 @@ std::string rmw_context_impl_s::enclave() const } ///============================================================================= -const z_loaned_session_t * rmw_context_impl_s::session() const +const std::shared_ptr rmw_context_impl_s::session() const { return data_->session(); } ///============================================================================= -std::optional & rmw_context_impl_s::shm_provider() +std::optional & rmw_context_impl_s::shm_provider() { return data_->shm_provider(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 1e46d6af..a74fab85 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -15,13 +15,13 @@ #ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_ #define DETAIL__RMW_CONTEXT_IMPL_S_HPP_ -#include - #include #include #include #include +#include + #include "graph_cache.hpp" #include "rmw_node_data.hpp" @@ -48,13 +48,13 @@ class rmw_context_impl_s final std::string enclave() const; // Loan the Zenoh session. - const z_loaned_session_t * session() const; + const std::shared_ptr session() const; // Get a reference to the shm_provider. // Note: This is not thread-safe. // TODO(Yadunund): Remove this API and instead include a publish() API // that handles the shm_provider once the context manages publishers. - std::optional & shm_provider(); + std::optional & shm_provider(); // Get the graph guard condition. rmw_guard_condition_t * graph_guard_condition(); diff --git a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp b/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp deleted file mode 100644 index 1574abc5..00000000 --- a/rmw_zenoh_cpp/src/detail/rmw_init_options_impl.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ -#define DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ - -#include - -namespace rmw_zenoh_cpp -{ -// TODO(YV): Consider using this again. -struct rmw_init_options_impl_s -{ - // An owned config. - z_owned_config_t config; -}; -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__RMW_INIT_OPTIONS_IMPL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index cb4817a5..4a8e8c3a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -30,7 +30,7 @@ namespace rmw_zenoh_cpp std::shared_ptr NodeData::make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -38,7 +38,7 @@ std::shared_ptr NodeData::make( { // Create the entity. auto entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(id), std::to_string(id), rmw_zenoh_cpp::liveliness::EntityType::Node, @@ -58,21 +58,18 @@ std::shared_ptr NodeData::make( // Create the liveliness token. std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - z_owned_liveliness_token_t token; - if (z_liveliness_declare_token(session, &token, z_loan(liveliness_ke), NULL) != Z_OK) { + zenoh::ZResult err; + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the node."); return nullptr; } - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - free_token.cancel(); return std::shared_ptr( new NodeData{ node, @@ -87,7 +84,7 @@ NodeData::NodeData( const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - z_owned_liveliness_token_t token) + zenoh::LivelinessToken token) : node_(node), id_(std::move(id)), entity_(std::move(entity)), @@ -121,7 +118,7 @@ std::size_t NodeData::id() const ///============================================================================= bool NodeData::create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -143,7 +140,7 @@ bool NodeData::create_pub_data( } auto pub_data = PublisherData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -187,7 +184,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) ///============================================================================= bool NodeData::create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -210,7 +207,7 @@ bool NodeData::create_sub_data( } auto sub_data = SubscriptionData::make( - std::move(session), + session, std::move(graph_cache), node_, entity_->node_info(), @@ -255,7 +252,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) ///============================================================================= bool NodeData::create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -277,7 +274,7 @@ bool NodeData::create_service_data( } auto service_data = ServiceData::make( - std::move(session), + session, node_, entity_->node_info(), id_, @@ -322,7 +319,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service) ///============================================================================= bool NodeData::create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_supports, @@ -344,7 +341,7 @@ bool NodeData::create_client_data( } auto client_data = ClientData::make( - std::move(session), + session, node_, client, entity_->node_info(), @@ -402,7 +399,14 @@ rmw_ret_t NodeData::shutdown() } // Unregister this node from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); + zenoh::ZResult err; + std::move(token_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } is_shutdown_ = true; return ret; diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index ab17e096..e6fbede5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -15,14 +15,14 @@ #ifndef DETAIL__RMW_NODE_DATA_HPP_ #define DETAIL__RMW_NODE_DATA_HPP_ -#include - #include #include #include #include #include +#include + #include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "rmw_client_data.hpp" @@ -43,7 +43,7 @@ class NodeData final static std::shared_ptr make( const rmw_node_t * const node, std::size_t id, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t domain_id, const std::string & namespace_, const std::string & node_name, @@ -55,7 +55,7 @@ class NodeData final // Create a new PublisherData for a given rmw_publisher_t. bool create_pub_data( const rmw_publisher_t * const publisher, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, @@ -70,7 +70,7 @@ class NodeData final // Create a new SubscriptionData for a given rmw_subscription_t. bool create_sub_data( const rmw_subscription_t * const subscription, - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, std::size_t id, const std::string & topic_name, @@ -86,7 +86,7 @@ class NodeData final // Create a new ServiceData for a given rmw_service_t. bool create_service_data( const rmw_service_t * const service, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -101,7 +101,7 @@ class NodeData final // Create a new ClientData for a given rmw_client_t. bool create_client_data( const rmw_client_t * const client, - const z_loaned_session_t * session, + std::shared_ptr session, std::size_t id, const std::string & service_name, const rosidl_service_type_support_t * type_support, @@ -128,7 +128,7 @@ class NodeData final const rmw_node_t * const node, std::size_t id, std::shared_ptr entity, - z_owned_liveliness_token_t token); + zenoh::LivelinessToken token); // Internal mutex. mutable std::recursive_mutex mutex_; // The rmw_node_t associated with this NodeData. @@ -139,7 +139,7 @@ class NodeData final // The Entity generated for the node. std::shared_ptr entity_; // Liveliness token for the node. - z_owned_liveliness_token_t token_; + std::optional token_; // Shutdown flag. bool is_shutdown_; // Map of publishers. diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index d1449b63..2e9e7433 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "cdr.hpp" #include "rmw_context_impl_s.hpp" @@ -42,7 +43,7 @@ namespace rmw_zenoh_cpp ///============================================================================= std::shared_ptr PublisherData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -82,7 +83,7 @@ std::shared_ptr PublisherData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(node_id), std::to_string(publisher_id), liveliness::EntityType::Publisher, @@ -102,52 +103,31 @@ std::shared_ptr PublisherData::make( return nullptr; } - std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_; - z_view_keyexpr_t pub_ke; - if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - + zenoh::ZResult err; + std::optional pub_cache; + zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_); // Create a Publication Cache if durability is transient_local. - std::optional pub_cache = std::nullopt; if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_publication_cache_options_t pub_cache_opts; - ze_publication_cache_options_default(&pub_cache_opts); + zenoh::Session::PublicationCacheOptions pub_cache_opts = + zenoh::Session::PublicationCacheOptions::create_default(); + pub_cache_opts.history = adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; - // Set the queryable_prefix to the session id so that querying subscribers can specify this - // session id to obtain latest data from this specific publication caches when querying over - // the same keyexpression. - // When such a prefix is added to the PublicationCache, it listens to queries with this extra - // prefix (allowing to be queried in a unique way), but still replies with the original - // publications' key expressions. + std::string queryable_prefix = entity->zid(); - z_view_keyexpr_t prefix_ke; - z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str()); - pub_cache_opts.queryable_prefix = z_loan(prefix_ke); - - ze_owned_publication_cache_t pub_cache_; - if (ze_declare_publication_cache( - session, &pub_cache_, z_loan(pub_ke), &pub_cache_opts)) - { + pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix); + + pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &err); + + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } - pub_cache = pub_cache_; } - auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [&pub_cache]() { - if (pub_cache.has_value()) { - z_drop(z_move(pub_cache.value())); - } - }); // Set congestion_control to BLOCK if appropriate. - z_publisher_options_t opts; - z_publisher_options_default(&opts); + zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default(); opts.congestion_control = Z_CONGESTION_CONTROL_DROP; - if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { opts.reliability = Z_RELIABILITY_RELIABLE; @@ -155,45 +135,30 @@ std::shared_ptr PublisherData::make( opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } } - z_owned_publisher_t pub; - // TODO(clalancette): What happens if the key name is a valid but empty string? - if (z_declare_publisher( - session, &pub, z_loan(pub_ke), &opts) != Z_OK) - { + auto pub = session->declare_publisher(pub_ke, std::move(opts), &err); + + if (err != Z_OK) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } - auto undeclare_z_publisher = rcpputils::make_scope_exit( - [&pub]() { - z_undeclare_publisher(z_move(pub)); - }); std::string liveliness_keyexpr = entity->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - z_owned_liveliness_token_t token; - if (z_liveliness_declare_token( - session, &token, z_loan(liveliness_ke), - NULL) != Z_OK) - { + auto token = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the publisher."); return nullptr; } - auto free_token = rcpputils::make_scope_exit( - [&token]() { - z_drop(z_move(token)); - }); - - undeclare_z_publisher_cache.cancel(); - undeclare_z_publisher.cancel(); - free_token.cancel(); return std::shared_ptr( new PublisherData{ node, std::move(entity), + std::move(session), std::move(pub), std::move(pub_cache), std::move(token), @@ -206,13 +171,15 @@ std::shared_ptr PublisherData::make( PublisherData::PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - z_owned_publisher_t pub, - std::optional pub_cache, - z_owned_liveliness_token_t token, + std::shared_ptr sess, + zenoh::Publisher pub, + std::optional pub_cache, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(sess)), pub_(std::move(pub)), pub_cache_(std::move(pub_cache)), token_(std::move(token)), @@ -227,7 +194,7 @@ PublisherData::PublisherData( ///============================================================================= rmw_ret_t PublisherData::publish( const void * ros_message, - std::optional & shm_provider) + std::optional & /*shm_provider*/) { std::lock_guard lock(mutex_); if (is_shutdown_) { @@ -260,29 +227,32 @@ rmw_ret_t PublisherData::publish( }); // Get memory from SHM buffer if available. - if (shm_provider.has_value()) { - RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); - - auto provider = shm_provider.value(); - z_buf_layout_alloc_result_t alloc; - // TODO(yuyuan): SHM, configure this - z_alloc_alignment_t alignment = {5}; - z_shm_provider_alloc_gc_defrag_blocking(&alloc, z_loan(provider), SHM_BUF_OK_SIZE, alignment); - - if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { - shmbuf = std::make_optional(alloc.buf); - msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); - } else { - // TODO(Yadunund): Should we revert to regular allocation and not return an error? - RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); - return RMW_RET_ERROR; - } - } else { - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - } + // if (shm_provider.has_value()) { + // RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled."); + + // auto provider = shm_provider.value()._0; + // z_buf_layout_alloc_result_t alloc; + // // TODO(yuyuan): SHM, configure this + // z_alloc_alignment_t alignment = {5}; + // z_shm_provider_alloc_gc_defrag_blocking( + // &alloc, + // z_loan(provider), + // SHM_BUF_OK_SIZE, + // alignment); + + // if (alloc.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) { + // shmbuf = std::make_optional(alloc.buf); + // msg_bytes = reinterpret_cast(z_shm_mut_data_mut(z_loan_mut(alloc.buf))); + // } else { + // // TODO(Yadunund): Should we revert to regular allocation and not return an error? + // RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing."); + // return RMW_RET_ERROR; + // } + // } else { + // Get memory from the allocator. + msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); @@ -303,24 +273,21 @@ rmw_ret_t PublisherData::publish( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - z_owned_bytes_t attachment; - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - - z_owned_bytes_t payload; - if (shmbuf.has_value()) { - z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); - } else { - z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); - } - - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + zenoh::ZResult err; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num( + sequence_number_++, + entity_->copy_gid()); + + // TODO(ahcorde): shmbuf + std::vector raw_image( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + zenoh::Bytes payload(raw_image); + + pub_.put(std::move(payload), std::move(options), &err); + if (err != Z_OK) { + if (err == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -336,7 +303,7 @@ rmw_ret_t PublisherData::publish( ///============================================================================= rmw_ret_t PublisherData::publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & /*shm_provider*/) + std::optional & /*shm_provider*/) { eprosima::fastcdr::FastBuffer buffer( reinterpret_cast(serialized_message->buffer), serialized_message->buffer_length); @@ -352,19 +319,18 @@ rmw_ret_t PublisherData::publish_serialized_message( // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. - z_publisher_put_options_t options; - z_publisher_put_options_default(&options); - uint8_t local_gid[RMW_GID_STORAGE_SIZE]; - entity_->copy_gid(local_gid); - z_owned_bytes_t attachment; - create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid); - options.attachment = z_move(attachment); - z_owned_bytes_t payload; - z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); - - z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); - if (res != Z_OK) { - if (res == Z_ESESSION_CLOSED) { + zenoh::ZResult err; + auto options = zenoh::Publisher::PutOptions::create_default(); + options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid()); + + std::vector raw_image( + serialized_message->buffer, + serialized_message->buffer + data_length); + zenoh::Bytes payload(raw_image); + + pub_.put(std::move(payload), std::move(options), &err); + if (err != Z_OK) { + if (err == Z_ESESSION_CLOSED) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", "unable to publish message since the zenoh session is closed"); @@ -373,7 +339,6 @@ rmw_ret_t PublisherData::publish_serialized_message( return RMW_RET_ERROR; } } - return RMW_RET_OK; } @@ -398,6 +363,12 @@ void PublisherData::copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const entity_->copy_gid(out_gid); } +std::vector PublisherData::copy_gid() const +{ + std::lock_guard lock(mutex_); + return entity_->copy_gid(); +} + ///============================================================================= bool PublisherData::liveliness_is_valid() const { @@ -437,12 +408,23 @@ rmw_ret_t PublisherData::shutdown() } // Unregister this publisher from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); - if (pub_cache_.has_value()) { - z_drop(z_move(pub_cache_.value())); + zenoh::ZResult err; + std::move(token_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } + std::move(pub_).undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare publisher"); + return RMW_RET_ERROR; } - z_undeclare_publisher(z_move(pub_)); + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index bde0c13b..d953cdd8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -15,14 +15,15 @@ #ifndef DETAIL__RMW_PUBLISHER_DATA_HPP_ #define DETAIL__RMW_PUBLISHER_DATA_HPP_ -#include - #include #include #include #include #include #include +#include + +#include #include "event.hpp" #include "liveliness_utils.hpp" @@ -42,7 +43,7 @@ class PublisherData final public: // Make a shared_ptr of PublisherData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -54,12 +55,12 @@ class PublisherData final // Publish a ROS message. rmw_ret_t publish( const void * ros_message, - std::optional & shm_provider); + std::optional & shm_provider); // Publish a serialized ROS message. rmw_ret_t publish_serialized_message( const rmw_serialized_message_t * serialized_message, - std::optional & shm_provider); + std::optional & shm_provider); // Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity. std::size_t keyexpr_hash() const; @@ -69,6 +70,7 @@ class PublisherData final // Copy the GID of this PublisherData into an rmw_gid_t. void copy_gid(uint8_t out_gid[RMW_GID_STORAGE_SIZE]) const; + std::vector copy_gid() const; // Returns true if liveliness token is still valid. bool liveliness_is_valid() const; @@ -90,9 +92,10 @@ class PublisherData final PublisherData( const rmw_node_t * rmw_node, std::shared_ptr entity, - z_owned_publisher_t pub, - std::optional pub_cache, - z_owned_liveliness_token_t token, + std::shared_ptr session, + zenoh::Publisher pub, + std::optional pub_cache, + zenoh::LivelinessToken token, const void * type_support_impl, std::unique_ptr type_support); @@ -102,12 +105,14 @@ class PublisherData final const rmw_node_t * rmw_node_; // The Entity generated for the publisher. std::shared_ptr entity_; + // A shared session. + std::shared_ptr sess_; // An owned publisher. - z_owned_publisher_t pub_; + zenoh::Publisher pub_; // Optional publication cache when durability is transient_local. - std::optional pub_cache_; + std::optional pub_cache_; // Liveliness token for the publisher. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp index c7be982c..19ad0dc7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.cpp @@ -22,6 +22,9 @@ #include #include #include +#include + +#include #include "attachment_helpers.hpp" #include "cdr.hpp" @@ -38,34 +41,9 @@ namespace rmw_zenoh_cpp { -///============================================================================== -void service_data_handler(z_loaned_query_t * query, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_query_keyexpr(query), &keystr); - - ServiceData * service_data = - static_cast(data); - if (service_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain ServiceData from data for " - "service for %.*s", - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr)) - ); - return; - } - - std::chrono::nanoseconds::rep received_timestamp = - std::chrono::system_clock::now().time_since_epoch().count(); - - service_data->add_new_query(std::make_unique(query, received_timestamp)); -} - ///============================================================================= std::shared_ptr ServiceData::make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -126,7 +104,7 @@ std::shared_ptr ServiceData::make( std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(node_id), std::to_string(service_id), liveliness::EntityType::Service, @@ -150,77 +128,63 @@ std::shared_ptr ServiceData::make( new ServiceData{ node, std::move(entity), + session, request_members, response_members, std::move(request_type_support), std::move(response_type_support) }); - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr - // in the closure callback once we switch to zenoh-cpp. - if (z_keyexpr_from_str( - &service_data->keyexpr_, - service_data->entity_->topic_info().value().topic_keyexpr_.c_str()) != Z_OK) - { + zenoh::ZResult err; + service_data->keyexpr_ = service_data->entity_->topic_info()->topic_keyexpr_; + zenoh::KeyExpr service_ke(service_data->keyexpr_, true, &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return nullptr; } - auto free_ros_keyexpr = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->keyexpr_)); + + zenoh::Session::QueryableOptions qable_options = + zenoh::Session::QueryableOptions::create_default(); + qable_options.complete = true; + + std::weak_ptr data_wp = service_data; + service_data->qable_ = session->declare_queryable( + service_ke, + [data_wp](const zenoh::Query & query) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain ServiceData from data for %s.", + std::string(query.get_keyexpr().as_string_view())); + return; } - }); - z_owned_closure_query_t callback; - z_closure(&callback, service_data_handler, nullptr, service_data.get()); + std::chrono::nanoseconds::rep received_timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); - // Configure the queryable to process complete queries. - z_queryable_options_t qable_options; - z_queryable_options_default(&qable_options); - qable_options.complete = true; - if (z_declare_queryable( - session, &service_data->qable_, z_loan(service_data->keyexpr_), - z_move(callback), &qable_options) != Z_OK) - { + sub_data->add_new_query(std::make_unique(query, received_timestamp)); + }, + zenoh::closures::none, + std::move(qable_options), + &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh queryable"); return nullptr; } - auto undeclare_z_queryable = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_undeclare_queryable(z_move(service_data->qable_)); - } - }); std::string liveliness_keyexpr = service_data->entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - if (z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()) != Z_OK) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - auto free_token = rcpputils::make_scope_exit( - [service_data]() { - if (service_data != nullptr) { - z_drop(z_move(service_data->token_)); - } - }); - if (z_liveliness_declare_token( - session, &service_data->token_, z_loan(liveliness_ke), - NULL) != Z_OK) - { + service_data->token_ = session->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the service."); return nullptr; } - service_data->initialized_ = true; - - free_ros_keyexpr.cancel(); - undeclare_z_queryable.cancel(); - free_token.cancel(); - return service_data; } @@ -228,12 +192,14 @@ std::shared_ptr ServiceData::make( ServiceData::ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr session, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, std::unique_ptr response_type_support) : rmw_node_(rmw_node), entity_(std::move(entity)), + sess_(std::move(session)), request_type_support_impl_(request_type_support_impl), response_type_support_impl_(response_type_support_impl), request_type_support_(std::move(request_type_support)), @@ -279,15 +245,12 @@ void ServiceData::add_new_query(std::unique_ptr query) query_queue_.size() >= adapted_qos_profile.depth) { // Log warning if message is discarded due to hitting the queue depth - z_view_string_t keystr; - z_keyexpr_as_view_string(z_loan(keyexpr_), &keystr); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Query queue depth of %ld reached, discarding oldest Query " "for service for %.*s", adapted_qos_profile.depth, - static_cast(z_string_len(z_loan(keystr))), - z_string_data(z_loan(keystr))); + keyexpr_.c_str()); query_queue_.pop_front(); } query_queue_.emplace_back(std::move(query)); @@ -316,66 +279,91 @@ rmw_ret_t ServiceData::take_request( } std::unique_ptr query = std::move(query_queue_.front()); query_queue_.pop_front(); - const z_loaned_query_t * loaned_query = query->get_query(); + const zenoh::Query & loaned_query = query->get_query(); // DESERIALIZE MESSAGE ======================================================== - z_owned_slice_t payload; - z_bytes_to_slice(z_query_payload(loaned_query), &payload); - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(z_slice_data(z_loan(payload)))), - z_slice_len(z_loan(payload))); - - // Object that serializes the data - Cdr deser(fastbuffer); - if (!request_type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_request, - request_type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS message"); + auto payload = loaned_query.get_payload(); + if (!payload.has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData take_request payload is empty"); return RMW_RET_ERROR; } - // Fill in the request header. - // Get the sequence_number out of the attachment - AttachmentData attachment(z_query_attachment(loaned_query)); + auto slice = payload.value().get().slice_iter().next(); - request_header->request_id.sequence_number = attachment.sequence_number(); - if (request_header->request_id.sequence_number < 0) { - RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); - return RMW_RET_ERROR; - } + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; - attachment.copy_gid(request_header->request_id.writer_guid); + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload)), payload_len); - request_header->source_timestamp = attachment.source_timestamp(); - if (request_header->source_timestamp < 0) { - RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); - return RMW_RET_ERROR; - } - request_header->received_timestamp = query->get_received_timestamp(); + // Object that serializes the data + Cdr deser(fastbuffer); + if (!request_type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_request, + request_type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } - // Add this query to the map, so that rmw_send_response can quickly look it up later. - const size_t hash = rmw_zenoh_cpp::hash_gid(request_header->request_id.writer_guid); - std::unordered_map::iterator it = sequence_to_query_map_.find(hash); - if (it == sequence_to_query_map_.end()) { - SequenceToQuery stq; - sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq))); - it = sequence_to_query_map_.find(hash); - } else { - // Client already in the map - if (it->second.find(request_header->request_id.sequence_number) != it->second.end()) { - RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + // Fill in the request header. + // Get the sequence_number out of the attachment + if (!loaned_query.get_attachment().has_value()) { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData take_request attachment is empty"); + return RMW_RET_ERROR; + } + + rmw_zenoh_cpp::AttachmentData attachment(std::move( + loaned_query.get_attachment().value().get())); + + request_header->request_id.sequence_number = attachment.sequence_number(); + if (request_header->request_id.sequence_number < 0) { + RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - } - it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query))); - *taken = true; + memcpy( + request_header->request_id.writer_guid, + attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); - z_drop(z_move(payload)); + request_header->source_timestamp = attachment.source_timestamp(); + if (request_header->source_timestamp < 0) { + RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); + return RMW_RET_ERROR; + } + request_header->received_timestamp = query->get_received_timestamp(); + + // Add this query to the map, so that rmw_send_response can quickly look it up later. + const size_t hash = rmw_zenoh_cpp::hash_gid_p(request_header->request_id.writer_guid); + std::unordered_map::iterator it = sequence_to_query_map_.find(hash); + if (it == sequence_to_query_map_.end()) { + SequenceToQuery stq; + sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq))); + it = sequence_to_query_map_.find(hash); + } else { + // Client already in the map + if (it->second.find(request_header->request_id.sequence_number) != it->second.end()) { + RMW_SET_ERROR_MSG("duplicate sequence number in the map"); + return RMW_RET_ERROR; + } + } + + it->second.insert(std::make_pair(request_header->request_id.sequence_number, std::move(query))); + *taken = true; + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "ServiceData not able to get slice data"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -394,7 +382,7 @@ rmw_ret_t ServiceData::send_response( return RMW_RET_OK; } // Create the queryable payload - const size_t hash = hash_gid(request_id->writer_guid); + const size_t hash = hash_gid_p(request_id->writer_guid); std::unordered_map::iterator it = sequence_to_query_map_.find(hash); if (it == sequence_to_query_map_.end()) { // If there is no data associated with this request, the higher layers of @@ -447,21 +435,33 @@ rmw_ret_t ServiceData::send_response( size_t data_length = ser.get_serialized_data_length(); - const z_loaned_query_t * loaned_query = query->get_query(); - z_query_reply_options_t options; - z_query_reply_options_default(&options); - - z_owned_bytes_t attachment; - rmw_zenoh_cpp::create_map_and_set_sequence_num( - &attachment, request_id->sequence_number, - request_id->writer_guid); - options.attachment = z_move(attachment); + const zenoh::Query & loaned_query = query->get_query(); + zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default(); + std::vector writer_gid; + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + writer_gid.push_back(request_id->writer_guid[RMW_GID_STORAGE_SIZE - 1 - i]); + } + options.attachment = create_map_and_set_sequence_num( + request_id->sequence_number, + writer_gid); + + std::vector raw_bytes( + reinterpret_cast(response_bytes), + reinterpret_cast(response_bytes) + data_length); + zenoh::Bytes payload(raw_bytes); + + zenoh::ZResult err; + zenoh::KeyExpr service_ke(keyexpr_.c_str(), true, &err); + if (err != Z_OK) { + RMW_SET_ERROR_MSG("unable to create KeyExpr"); + return RMW_RET_ERROR; + } - z_owned_bytes_t payload; - z_bytes_copy_from_buf( - &payload, reinterpret_cast(response_bytes), data_length); - z_query_reply( - loaned_query, z_loan(keyexpr_), z_move(payload), &options); + loaned_query.reply(service_ke, std::move(payload), std::move(options), &err); + if (err != Z_OK) { + RMW_SET_ERROR_MSG("unable to reply"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -521,11 +521,25 @@ rmw_ret_t ServiceData::shutdown() // Unregister this node from the ROS graph. if (initialized_) { - z_drop(z_move(keyexpr_)); - z_liveliness_undeclare_token(z_move(token_)); - z_undeclare_queryable(z_move(qable_)); + zenoh::ZResult err; + std::move(token_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } + + std::move(qable_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare queryable"); + return RMW_RET_ERROR; + } } + sess_.reset(); is_shutdown_ = true; return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp index 75467494..53a77830 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_service_data.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__RMW_SERVICE_DATA_HPP_ #define DETAIL__RMW_SERVICE_DATA_HPP_ -#include - #include #include #include @@ -26,6 +24,8 @@ #include #include +#include + #include "event.hpp" #include "liveliness_utils.hpp" #include "message_type_support.hpp" @@ -42,12 +42,12 @@ namespace rmw_zenoh_cpp { ///============================================================================= -class ServiceData final +class ServiceData final : public std::enable_shared_from_this { public: // Make a shared_ptr of ServiceData. static std::shared_ptr make( - const z_loaned_session_t * session, + std::shared_ptr session, const rmw_node_t * const node, liveliness::NodeInfo node_info, std::size_t node_id, @@ -99,6 +99,7 @@ class ServiceData final ServiceData( const rmw_node_t * rmw_node, std::shared_ptr entity, + std::shared_ptr session, const void * request_type_support_impl, const void * response_type_support_impl, std::unique_ptr request_type_support, @@ -110,12 +111,15 @@ class ServiceData final const rmw_node_t * rmw_node_; // The Entity generated for the service. std::shared_ptr entity_; - // An owned keyexpression. - z_owned_keyexpr_t keyexpr_; + + // A shared session + std::shared_ptr sess_; + // The keyexpr string. + std::string keyexpr_; // An owned queryable. - z_owned_queryable_t qable_; + std::optional> qable_; // Liveliness token for the service. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields. const void * request_type_support_impl_; const void * response_type_support_impl_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index f0036606..12808860 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -40,61 +40,23 @@ namespace rmw_zenoh_cpp { -namespace -{ -//============================================================================== -// TODO(Yadunund): Make this a class member and lambda capture weak_from_this() -// instead of passing a rawptr to SubscriptionData when we switch to zenoh-cpp. -void sub_data_handler(z_loaned_sample_t * sample, void * data) -{ - z_view_string_t keystr; - z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr); - - auto sub_data = static_cast(data); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain SubscriptionData from data for %s.", - z_loan(keystr) - ); - return; - } - - AttachmentData attachment(z_sample_attachment(sample)); - const z_loaned_bytes_t * payload = z_sample_payload(sample); - - z_owned_slice_t slice; - z_bytes_to_slice(payload, &slice); - - std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); - - sub_data->add_new_message( - std::make_unique( - slice, - z_timestamp_ntp64_time(z_sample_timestamp(sample)), - std::move(attachment)), - topic_name); -} -} // namespace - ///============================================================================= SubscriptionData::Message::Message( - z_owned_slice_t p, + zenoh::Bytes p, uint64_t recv_ts, AttachmentData && attachment_) -: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) +: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { } ///============================================================================= SubscriptionData::Message::~Message() { - z_drop(z_move(payload)); } ///============================================================================= std::shared_ptr SubscriptionData::make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -136,7 +98,7 @@ std::shared_ptr SubscriptionData::make( // with Zenoh; after this, callbacks may come in at any time. std::size_t domain_id = node_info.domain_id_; auto entity = liveliness::Entity::make( - z_info_zid(session), + session->get_zid(), std::to_string(node_id), std::to_string(subscription_id), liveliness::EntityType::Subscription, @@ -161,6 +123,7 @@ std::shared_ptr SubscriptionData::make( node, graph_cache, std::move(entity), + std::move(session), type_support->data, std::move(message_type_support) }); @@ -178,11 +141,13 @@ SubscriptionData::SubscriptionData( const rmw_node_t * rmw_node, std::shared_ptr graph_cache, std::shared_ptr entity, + std::shared_ptr session, const void * type_support_impl, std::unique_ptr type_support) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), + sess_(std::move(session)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), last_known_published_msg_({}), @@ -198,33 +163,33 @@ SubscriptionData::SubscriptionData( // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() { - // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr - // in the closure callback once we switch to zenoh-cpp. - z_owned_closure_sample_t callback; - z_closure(&callback, sub_data_handler, nullptr, this); - - std::string topic_keyexpr = entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t sub_ke; - if (z_view_keyexpr_from_str(&sub_ke, topic_keyexpr.c_str()) != Z_OK) { + if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "`reliability` no longer supported on subscriber. Ignoring..."); + } + + zenoh::ZResult err; + zenoh::KeyExpr sub_ke(entity_->topic_info()->topic_keyexpr_, true, &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); return false; } rmw_context_impl_t * context_impl = static_cast(rmw_node_->context->impl); + sess_ = context_impl->session(); + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_querying_subscriber_options_t sub_options; - ze_querying_subscriber_options_default(&sub_options); - // Make the initial query to hit all the PublicationCaches, using a query_selector with - // '*' in place of the queryable_prefix of each PublicationCache + zenoh::Session::QueryingSubscriberOptions sub_options = + zenoh::Session::QueryingSubscriberOptions::create_default(); const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_; - z_view_keyexpr_t selector_ke; - z_view_keyexpr_from_str(&selector_ke, selector.c_str()); - sub_options.query_selector = z_loan(selector_ke); + zenoh::KeyExpr selector_ke(selector); + sub_options.query_keyexpr = std::move(selector_ke); // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. // By default a query accepts only replies that matches its query selector. // This allows us to selectively query certain PublicationCaches when defining the @@ -235,19 +200,51 @@ bool SubscriptionData::init() // We set consolidation to none as we need to receive transient local messages // from a number of publishers. Eg: To receive TF data published over /tf_static // by various publishers. - sub_options.query_consolidation = z_query_consolidation_none(); - ze_owned_querying_subscriber_t sub; - if (ze_declare_querying_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), &sub_options)) - { + sub_options.query_consolidation = + zenoh::QueryConsolidation(zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE); + + std::weak_ptr data_wp = shared_from_this(); + auto sub = context_impl->session()->declare_querying_subscriber( + sub_ke, + [data_wp](const zenoh::Sample & sample) { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + std::string(sample.get_keyexpr().as_string_view())); + return; + } + + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + + auto attachment_value = attachment.value(); + AttachmentData attachment_data(attachment_value); + + sub_data->add_new_message( + std::make_unique( + sample.get_payload().clone(), + sample.get_timestamp().value().get_time(), + std::move(attachment_data)), + std::string(sample.get_keyexpr().as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - std::weak_ptr data_wp = shared_from_this(); graph_cache_->set_querying_subscriber_callback( entity_->topic_info().value().topic_keyexpr_, entity_->keyexpr_hash(), @@ -271,71 +268,81 @@ bool SubscriptionData::init() "QueryingSubscriberCallback triggered over %s.", selector.c_str() ); - z_get_options_t opts; - z_get_options_default(&opts); + zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default(); opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = z_query_consolidation_latest(); + opts.consolidation = zenoh::ConsolidationMode::Z_CONSOLIDATION_MODE_NONE; opts.accept_replies = ZC_REPLY_KEYEXPR_ANY; - z_view_keyexpr_t ke; - z_view_keyexpr_from_str(&ke, selector.c_str()); - ze_querying_subscriber_get( - z_loan(std::get(sub_data->sub_)), - z_loan(ke), - &opts); + zenoh::ZResult err; + std::get>(sub_data->sub_.value()).get( + zenoh::KeyExpr(selector), + std::move(opts), + &err); + + if (err != Z_OK) { + RMW_SET_ERROR_MSG("unable to get querying subscriber."); + return; + } } ); } else { - // Create a regular subscriber for all other durability settings. - z_subscriber_options_t sub_options; - z_subscriber_options_default(&sub_options); - - z_owned_subscriber_t sub; - if (z_declare_subscriber( - context_impl->session(), &sub, z_loan(sub_ke), z_move(callback), - &sub_options) != Z_OK) - { + zenoh::Session::SubscriberOptions sub_options = + zenoh::Session::SubscriberOptions::create_default(); + std::weak_ptr data_wp = shared_from_this(); + zenoh::Subscriber sub = context_impl->session()->declare_subscriber( + sub_ke, + [data_wp](const zenoh::Sample & sample) { + zenoh::KeyExpr keystr(std::string(sample.get_keyexpr().as_string_view())); + + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback." + ); + return; + } + auto attachment = sample.get_attachment(); + if (!attachment.has_value()) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain attachment") + return; + } + auto payload = sample.get_payload().clone(); + auto attachment_value = attachment.value(); + + AttachmentData attachment_data(attachment_value); + sub_data->add_new_message( + std::make_unique( + sample.get_payload().clone(), + sample.get_timestamp().value().get_time(), + std::move(attachment_data)), + std::string(keystr.as_string_view())); + }, + zenoh::closures::none, + std::move(sub_options), + &err); + if (err != Z_OK) { RMW_SET_ERROR_MSG("unable to create zenoh subscription"); return false; } - sub_ = sub; + sub_ = std::move(sub); } - auto undeclare_z_sub = rcpputils::make_scope_exit( - [this]() { - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub == nullptr || z_undeclare_subscriber(z_move(*sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(z_move(*querying_sub))) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } - } - }); - // Publish to the graph that a new subscription is in town. std::string liveliness_keyexpr = entity_->liveliness_keyexpr(); - z_view_keyexpr_t liveliness_ke; - z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str()); - - auto free_token = rcpputils::make_scope_exit( - [this]() { - z_drop(z_move(token_)); - }); - if (z_liveliness_declare_token( - context_impl->session(), &token_, z_loan(liveliness_ke), NULL) != Z_OK) - { + token_ = context_impl->session()->liveliness_declare_token( + zenoh::KeyExpr(liveliness_keyexpr), + zenoh::Session::LivelinessDeclarationOptions::create_default(), + &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the subscription."); return false; } - undeclare_z_sub.cancel(); - free_token.cancel(); - initialized_ = true; return true; @@ -403,21 +410,36 @@ rmw_ret_t SubscriptionData::shutdown() graph_cache_->remove_qos_event_callbacks(entity_->keyexpr_hash()); // Unregister this subscription from the ROS graph. - z_liveliness_undeclare_token(z_move(token_)); + zenoh::ZResult err; + std::move(token_).value().undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to undeclare liveliness token"); + return RMW_RET_ERROR; + } - z_owned_subscriber_t * sub = std::get_if(&sub_); - if (sub != nullptr) { - if (z_undeclare_subscriber(z_move(*sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare sub."); - ret = RMW_RET_ERROR; - } - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_); - if (querying_sub != nullptr) { - if (ze_undeclare_querying_subscriber(z_move(*querying_sub)) != Z_OK) { - RMW_SET_ERROR_MSG("failed to undeclare querying sub."); - ret = RMW_RET_ERROR; + if (sub_.has_value()) { + zenoh::Subscriber * sub = std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare sub."); + return RMW_RET_ERROR; + } + } else { + zenoh::ext::QueryingSubscriber * sub = + std::get_if>(&sub_.value()); + if (sub != nullptr) { + std::move(*sub).undeclare(&err); + if (err != Z_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "failed to undeclare querying sub."); + return RMW_RET_ERROR; + } } } } @@ -473,37 +495,48 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto slice = msg_data->payload.slice_iter().next(); - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(payload)), - payload_len); + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!type_support_->deserialize_ros_message( - deser.get_cdr(), - ros_message, - type_support_impl_)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS message"); - return RMW_RET_ERROR; - } + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(payload)), + payload_len); - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp(); - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - msg_data->attachment.copy_gid(message_info->publisher_gid.data); - message_info->from_intra_process = false; - } + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_message, + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } - *taken = true; + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->attachment.source_timestamp(); + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->attachment.sequence_number(); + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy( + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + *taken = true; + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; + } return RMW_RET_OK; } @@ -524,30 +557,41 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + auto slice = msg_data->payload.slice_iter().next(); - if (serialized_message->buffer_capacity < payload_len) { - rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, payload_len); - if (ret != RMW_RET_OK) { - return ret; // Error message already set + if (slice.has_value()) { + const uint8_t * payload = slice.value().data; + const size_t payload_len = slice.value().len; + if (serialized_message->buffer_capacity < payload_len) { + rmw_ret_t ret = + rmw_serialized_message_resize(serialized_message, payload_len); + if (ret != RMW_RET_OK) { + return ret; // Error message already set + } } - } - serialized_message->buffer_length = payload_len; - memcpy(serialized_message->buffer, payload, payload_len); - - *taken = true; - - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->attachment.source_timestamp(); - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->attachment.sequence_number(); - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - msg_data->attachment.copy_gid(message_info->publisher_gid.data); - message_info->from_intra_process = false; + serialized_message->buffer_length = payload_len; + memcpy(serialized_message->buffer, payload, payload_len); + + *taken = true; + + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->attachment.source_timestamp(); + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->attachment.sequence_number(); + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy( + message_info->publisher_gid.data, + msg_data->attachment.copy_gid().data(), + RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData not able to get slice data"); + return RMW_RET_ERROR; } return RMW_RET_OK; @@ -584,7 +628,7 @@ void SubscriptionData::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = msg->attachment.gid_hash(); + const size_t gid_hash = hash_gid(msg->attachment.copy_gid()); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { const int64_t seq_increment = std::abs( diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 84d9a488..74e82ec9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -15,8 +15,6 @@ #ifndef DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ #define DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ -#include - #include #include #include @@ -28,6 +26,8 @@ #include #include +#include + #include "event.hpp" #include "graph_cache.hpp" #include "liveliness_utils.hpp" @@ -49,20 +49,20 @@ class SubscriptionData final : public std::enable_shared_from_this make( - const z_loaned_session_t * session, + std::shared_ptr session, std::shared_ptr graph_cache, const rmw_node_t * const node, liveliness::NodeInfo node_info, @@ -126,6 +126,7 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache, std::shared_ptr entity, + std::shared_ptr session, const void * type_support_impl, std::unique_ptr type_support); @@ -139,10 +140,12 @@ class SubscriptionData final : public std::enable_shared_from_this graph_cache_; // The Entity generated for the subscription. std::shared_ptr entity_; + // A shared session + std::shared_ptr sess_; // An owned subscriber or querying_subscriber depending on the QoS settings. - std::variant sub_; + std::optional, zenoh::ext::QueryingSubscriber>> sub_; // Liveliness token for the subscription. - z_owned_liveliness_token_t token_; + std::optional token_; // Type support fields const void * type_support_impl_; std::unique_ptr type_support_; diff --git a/rmw_zenoh_cpp/src/detail/type_support.cpp b/rmw_zenoh_cpp/src/detail/type_support.cpp index fff22474..fc462975 100644 --- a/rmw_zenoh_cpp/src/detail/type_support.cpp +++ b/rmw_zenoh_cpp/src/detail/type_support.cpp @@ -131,10 +131,10 @@ bool TypeSupport::deserialize_ros_message( uint8_t dump = 0; deser >> dump; (void)dump; - } catch (const eprosima::fastcdr::exception::Exception &) { + } catch (const eprosima::fastcdr::exception::Exception & e) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Fast CDR exception deserializing message of type %s.", - get_name()); + "Fast CDR exception deserializing message of type %s. %s", + get_name(), e.what()); return false; } diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp index f295befc..6c906522 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.cpp @@ -19,6 +19,9 @@ #include #include +#include +#include + #include "logging_macros.hpp" #include @@ -41,10 +44,9 @@ static const std::unordered_map _get_z_config( const char * envar_name, - const char * default_uri, - z_owned_config_t * config) + const char * default_uri) { const char * configured_uri; const char * envar_uri; @@ -53,40 +55,42 @@ rmw_ret_t _get_z_config( // NULL is returned if everything is ok. RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Envar %s cannot be read.", envar_name); - return RMW_RET_ERROR; + return std::nullopt; } // If the environment variable is set, try to read the configuration from the file, // if the environment variable is not set use internal configuration configured_uri = envar_uri[0] != '\0' ? envar_uri : default_uri; // Try to read the configuration - if (zc_config_from_file(config, configured_uri) != Z_OK) { + zenoh::ZResult err; + zenoh::Config config = zenoh::Config::from_file(configured_uri, &err); + if (err != Z_OK) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Invalid configuration file %s", configured_uri); - return RMW_RET_ERROR; + return std::nullopt; } RMW_ZENOH_LOG_DEBUG_NAMED( "rmw_zenoh_cpp", "configured using configuration file %s", configured_uri); - return RMW_RET_OK; + return config; } } // namespace ///============================================================================= -rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * config) +std::optional get_z_config(const ConfigurableEntity & entity) { auto envar_map_it = envar_map.find(entity); if (envar_map_it == envar_map.end()) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "get_z_config called with invalid ConfigurableEntity."); - return RMW_RET_ERROR; + return std::nullopt; } // Get the absolute path to the default configuration file. static const std::string path_to_config_folder = ament_index_cpp::get_package_share_directory("rmw_zenoh_cpp") + "/config/"; const std::string default_config_path = path_to_config_folder + envar_map_it->second.second; - return _get_z_config(envar_map_it->second.first, default_config_path.c_str(), config); + return _get_z_config(envar_map_it->second.first, default_config_path.c_str()); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp index dceafe5d..1f1e194f 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_config.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_config.hpp @@ -15,12 +15,13 @@ #ifndef DETAIL__ZENOH_CONFIG_HPP_ #define DETAIL__ZENOH_CONFIG_HPP_ -#include - #include #include #include +#include +#include + #include "rmw/ret_types.h" #define ZENOH_LOG_ENV_VAR_STR "RUST_LOG" @@ -46,9 +47,9 @@ enum class ConfigurableEntity : uint8_t /// is configured using the rmw_zenoh default configuration file. /// @param entity The zenoh entity to be configured. /// @param config The zenoh configuration to be filled. -/// @returns `RMW_RET_OK` if the configuration was successfully loaded. +/// @returns The zenoh configuration to be filled. [[nodiscard]] -rmw_ret_t get_z_config(const ConfigurableEntity & entity, z_owned_config_t * config); +std::optional get_z_config(const ConfigurableEntity & entity); ///============================================================================= /// Get the number of times rmw_init should try to connect to a zenoh router diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp deleted file mode 100644 index b6c5d21e..00000000 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "zenoh_router_check.hpp" - -#include - -#include -#include -#include - -#include "logging_macros.hpp" -#include "liveliness_utils.hpp" - -namespace rmw_zenoh_cpp -{ -///============================================================================= -rmw_ret_t zenoh_router_check(const z_loaned_session_t * session) -{ - // Initialize context for callback - int context = 0; - - // Define callback - auto callback = [](const struct z_id_t * id, void * ctx) { - const std::string id_str = liveliness::zid_to_str(*id); - RMW_ZENOH_LOG_INFO_NAMED( - "rmw_zenoh_cpp", - "Successfully connected to a Zenoh router with id %s.", id_str.c_str()); - // Note: Callback is guaranteed to never be called - // concurrently according to z_info_routers_zid docstring - (*(static_cast(ctx)))++; - }; - - z_owned_closure_zid_t router_callback; - z_closure(&router_callback, callback, NULL, &context); - if (z_info_routers_zid(session, z_move(router_callback)) != Z_OK) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Failed to evaluate if Zenoh routers are connected to the session."); - return RMW_RET_ERROR; - } else { - if (context == 0) { - return RMW_RET_ERROR; - } - } - - return RMW_RET_OK; -} -} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp deleted file mode 100644 index 1db74a4c..00000000 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.hpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef DETAIL__ZENOH_ROUTER_CHECK_HPP_ -#define DETAIL__ZENOH_ROUTER_CHECK_HPP_ - -#include - -#include "rmw/ret_types.h" - -namespace rmw_zenoh_cpp -{ -/// Check if a Zenoh router is connected to the session. -/// @param session Zenoh session to check. -/// @return RMW_RET_OK if a Zenoh router is connected to the session. -rmw_ret_t zenoh_router_check(const z_loaned_session_t * session); -} // namespace rmw_zenoh_cpp - -#endif // DETAIL__ZENOH_ROUTER_CHECK_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 77bb4dc2..1e71c865 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include "attachment_helpers.hpp" #include "rcpputils/scope_exit.hpp" @@ -25,23 +27,23 @@ namespace rmw_zenoh_cpp { ///============================================================================= -void create_map_and_set_sequence_num( - z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]) +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, std::vector gid) { auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); int64_t source_timestamp = now_ns.count(); - AttachmentData data(sequence_number, source_timestamp, gid); - data.serialize_to_zbytes(out_bytes); + rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid); + return std::move(data.serialize_to_zbytes()); } ///============================================================================= ZenohQuery::ZenohQuery( - const z_loaned_query_t * query, + const zenoh::Query & query, std::chrono::nanoseconds::rep received_timestamp) { - z_query_clone(&query_, query); + query_ = query.clone(); received_timestamp_ = received_timestamp; } @@ -52,37 +54,27 @@ std::chrono::nanoseconds::rep ZenohQuery::get_received_timestamp() const } ///============================================================================= -ZenohQuery::~ZenohQuery() -{ - z_drop(z_move(query_)); -} +ZenohQuery::~ZenohQuery() {} ///============================================================================= -const z_loaned_query_t * ZenohQuery::get_query() const {return z_loan(query_);} +const zenoh::Query & ZenohQuery::get_query() const {return query_.value();} ///============================================================================= ZenohReply::ZenohReply( - const z_loaned_reply_t * reply, + const zenoh::Reply & reply, std::chrono::nanoseconds::rep received_timestamp) { - z_reply_clone(&reply_, reply); + reply_ = reply.clone(); received_timestamp_ = received_timestamp; } ///============================================================================= -ZenohReply::~ZenohReply() -{ - z_drop(z_move(reply_)); -} +ZenohReply::~ZenohReply() {} ///============================================================================= -std::optional ZenohReply::get_sample() const +const zenoh::Reply & ZenohReply::get_sample() const { - if (z_reply_is_ok(z_loan(reply_))) { - return z_reply_ok(z_loan(reply_)); - } - - return std::nullopt; + return reply_.value(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 5f569082..0a7d10b1 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -15,37 +15,36 @@ #ifndef DETAIL__ZENOH_UTILS_HPP_ #define DETAIL__ZENOH_UTILS_HPP_ -#include +#include #include #include #include +#include #include "rmw/types.h" namespace rmw_zenoh_cpp { ///============================================================================= -void -create_map_and_set_sequence_num( - z_owned_bytes_t * out_bytes, int64_t sequence_number, - uint8_t gid[RMW_GID_STORAGE_SIZE]); +zenoh::Bytes create_map_and_set_sequence_num( + int64_t sequence_number, std::vector gid); ///============================================================================= // A class to store the replies to service requests. class ZenohReply final { public: - ZenohReply(const z_loaned_reply_t * reply, std::chrono::nanoseconds::rep received_timestamp); + ZenohReply(const zenoh::Reply & reply, std::chrono::nanoseconds::rep received_timestamp); ~ZenohReply(); - std::optional get_sample() const; + const zenoh::Reply & get_sample() const; std::chrono::nanoseconds::rep get_received_timestamp() const; private: - z_owned_reply_t reply_; + std::optional reply_; std::chrono::nanoseconds::rep received_timestamp_; }; @@ -54,16 +53,16 @@ class ZenohReply final class ZenohQuery final { public: - ZenohQuery(const z_loaned_query_t * query, std::chrono::nanoseconds::rep received_timestamp); + ZenohQuery(const zenoh::Query & query, std::chrono::nanoseconds::rep received_timestamp); ~ZenohQuery(); - const z_loaned_query_t * get_query() const; + const zenoh::Query & get_query() const; std::chrono::nanoseconds::rep get_received_timestamp() const; private: - z_owned_query_t query_; + std::optional query_; std::chrono::nanoseconds::rep received_timestamp_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index 3d7570cc..2ca1808e 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -35,6 +35,7 @@ rmw_get_publishers_info_by_topic( bool no_mangle, rmw_topic_endpoint_info_array_t * publishers_info) { + printf("zenoh::rmw_get_publishers_info_by_topic\n"); RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index e76d6062..86a1974b 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - -#include #include #include -#include "detail/guard_condition.hpp" +#include + #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" #include "detail/rmw_context_impl_s.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_init_options.cpp b/rmw_zenoh_cpp/src/rmw_init_options.cpp index 783c5a6a..15e06e2b 100644 --- a/rmw_zenoh_cpp/src/rmw_init_options.cpp +++ b/rmw_zenoh_cpp/src/rmw_init_options.cpp @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include +#include #include "detail/identifier.hpp" -#include "detail/rmw_init_options_impl.hpp" #include "rcpputils/scope_exit.hpp" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 1e13a710..59f753bd 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -13,8 +13,7 @@ // limitations under the License. #include - -#include +#include #include #include @@ -26,7 +25,8 @@ #include #include -#include "detail/attachment_helpers.hpp" +#include + #include "detail/cdr.hpp" #include "detail/guard_condition.hpp" #include "detail/graph_cache.hpp" @@ -751,7 +751,7 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (!pub_data->liveliness_is_valid()) { + if (pub_data->is_shutdown()) { return RMW_RET_ERROR; } @@ -2451,7 +2451,7 @@ rmw_get_gid_for_publisher(const rmw_publisher_t * publisher, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - pub_data->copy_gid(gid->data); + memcpy(gid->data, pub_data->copy_gid().data(), RMW_GID_STORAGE_SIZE); return RMW_RET_OK; } @@ -2468,7 +2468,7 @@ rmw_get_gid_for_client(const rmw_client_t * client, rmw_gid_t * gid) RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); gid->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - client_data->copy_gid(gid->data); + memcpy(gid->data, client_data->copy_gid().data(), RMW_GID_STORAGE_SIZE); return RMW_RET_OK; } diff --git a/rmw_zenoh_cpp/src/zenohd/main.cpp b/rmw_zenoh_cpp/src/zenohd/main.cpp index ce73fc14..ab791a03 100644 --- a/rmw_zenoh_cpp/src/zenohd/main.cpp +++ b/rmw_zenoh_cpp/src/zenohd/main.cpp @@ -23,16 +23,14 @@ #include #endif -#include -#include +#include +#include #include "../detail/zenoh_config.hpp" -#include "../detail/liveliness_utils.hpp" #include "rmw/error_handling.h" - -#include "rcpputils/scope_exit.hpp" +#include "rcutils/env.h" static bool running = true; static std::mutex run_mutex; @@ -60,40 +58,36 @@ int main(int argc, char ** argv) (void)argc; (void)argv; - // If not already defined, set the logging environment variable for Zenoh router - // to info level by default. - // TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values. - if (setenv(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_INFO_LEVEL_STR, 0) != 0) { + if (!rcutils_set_env_overwrite(ZENOH_LOG_ENV_VAR_STR, ZENOH_LOG_INFO_LEVEL_STR, 0)) { RMW_SET_ERROR_MSG("Error configuring Zenoh logging."); return 1; } - // Enable the zenoh built-in logger. - zc_try_init_log_from_env(); + // Enable the zenoh built-in logger + zenoh::try_init_log_from_env(); + + std::optional config = rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Router); - // Initialize the zenoh configuration for the router. - z_owned_config_t config; - if ((rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Router, - &config)) != RMW_RET_OK) - { + if (!config.has_value()) { RMW_SET_ERROR_MSG("Error configuring Zenoh router."); return 1; } - z_owned_session_t session; - if (z_open(&session, z_move(config), NULL) != Z_OK) { - printf("Unable to open router session!\n"); + zenoh::ZResult result; + auto session = zenoh::Session::open( + std::move(config.value()), + zenoh::Session::SessionOptions::create_default(), + &result); + if (result != Z_OK) { + std::cout << "Error opening Session!" << "\\n"; return 1; } - auto always_close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_loan_mut(session), NULL); - }); - - printf( - "Started Zenoh router with id %s.\n", - rmw_zenoh_cpp::liveliness::zid_to_str(z_info_zid(z_session_loan(&session))).c_str()); + + std::cout << "Started Zenoh router with id" + << session.get_zid().to_string() + << std::endl; + #ifdef _WIN32 SetConsoleCtrlHandler(quit, TRUE); #else diff --git a/zenoh_c_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt similarity index 85% rename from zenoh_c_vendor/CMakeLists.txt rename to zenoh_cpp_vendor/CMakeLists.txt index cddd2fd4..37bdbd41 100644 --- a/zenoh_c_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.16) -project(zenoh_c_vendor) +project(zenoh_cpp_vendor) # Default to C++17 if(NOT CMAKE_CXX_STANDARD) @@ -35,4 +35,15 @@ ament_vendor(zenoh_c_vendor ament_export_dependencies(zenohc) +ament_vendor(zenoh_cpp_vendor + VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp + VCS_VERSION e8eca99b4eaff0963e52aefd8405909c1552080d + CMAKE_ARGS + -DZENOHCXX_ZENOHC=OFF +) + +externalproject_add_stepdependencies(zenoh_cpp_vendor configure zenoh_c_vendor) + +ament_export_dependencies(zenohcxx) + ament_package() diff --git a/zenoh_c_vendor/package.xml b/zenoh_cpp_vendor/package.xml similarity index 95% rename from zenoh_c_vendor/package.xml rename to zenoh_cpp_vendor/package.xml index b907feff..eb2c8c20 100644 --- a/zenoh_c_vendor/package.xml +++ b/zenoh_cpp_vendor/package.xml @@ -1,7 +1,7 @@ - zenoh_c_vendor + zenoh_cpp_vendor 0.0.1 Vendor pkg to install zenoh-c Yadunund