diff --git a/rmw_zenoh_cpp/src/detail/payload.hpp b/rmw_zenoh_cpp/src/detail/payload.hpp new file mode 100644 index 00000000..d2aaf4c8 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/payload.hpp @@ -0,0 +1,92 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__PAYLOAD_HPP_ +#define DETAIL__PAYLOAD_HPP_ + +#include + +#include +#include + +namespace rmw_zenoh_cpp +{ + ///============================================================================= +class Payload +{ +public: + explicit Payload(const z_loaned_bytes_t *bytes) + { + z_bytes_slice_iterator_t slices = z_bytes_get_slice_iterator(bytes); + z_view_slice_t view; + z_bytes_slice_iterator_next(&slices, &view); + if (!z_bytes_slice_iterator_next(&slices, &view)) { + z_owned_bytes_t owned_bytes; + z_bytes_clone(&owned_bytes, bytes); + Contiguous slice; + slice.view = view; + slice.owned_bytes = owned_bytes; + bytes_ = slice; + } else { + z_owned_slice_t slice; + z_bytes_to_slice(bytes, &slice); + bytes_ = slice; + } + } + + ~Payload() + { + if (std::holds_alternative(bytes_)) { + z_drop(z_move(std::get(bytes_))); + } else { + z_drop(z_move(std::get(bytes_).owned_bytes)); + } + } + + const uint8_t * data() + { + if (std::holds_alternative(bytes_)) { + z_owned_slice_t owned = std::get(bytes_); + return z_slice_data(z_loan(owned)); + } else { + z_view_slice_t view = std::get(bytes_).view; + return z_slice_data(z_loan(view)); + } + } + + size_t size() + { + if (std::holds_alternative(bytes_)) { + z_owned_slice_t owned = std::get(bytes_); + return z_slice_len(z_loan(owned)); + } else { + z_view_slice_t view = std::get(bytes_).view; + return z_slice_len(z_loan(view)); + } + } + +private: + struct Contiguous + { + z_view_slice_t view; + z_owned_bytes_t owned_bytes; + }; + using NonContiguous = z_owned_slice_t; + // Is `z_owned_slice_t` in case of a non-contiguous `bytes` + // and `z_view_slice_t` plus a `z_owned_bytes_t` otherwise. + std::variant bytes_; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__PAYLOAD_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 14f1ed8f..93354e7b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -63,14 +63,11 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) AttachmentData attachment(z_sample_attachment(sample)); const z_loaned_bytes_t * payload = z_sample_payload(sample); - z_owned_slice_t slice; - z_bytes_to_slice(payload, &slice); - std::string topic_name(z_string_data(z_loan(keystr)), z_string_len(z_loan(keystr))); sub_data->add_new_message( std::make_unique( - slice, + payload, std::chrono::system_clock::now().time_since_epoch().count(), std::move(attachment)), topic_name); @@ -79,17 +76,11 @@ void sub_data_handler(z_loaned_sample_t * sample, void * data) ///============================================================================= SubscriptionData::Message::Message( - z_owned_slice_t p, + const z_loaned_bytes_t * bytes, uint64_t recv_ts, AttachmentData && attachment_) -: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) -{ -} - -///============================================================================= -SubscriptionData::Message::~Message() +: payload(Payload(bytes)), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { - z_drop(z_move(payload)); } ///============================================================================= @@ -479,8 +470,8 @@ rmw_ret_t SubscriptionData::take_one_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + const uint8_t * payload = msg_data->payload.data(); + const size_t payload_len = msg_data->payload.size(); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -530,8 +521,8 @@ rmw_ret_t SubscriptionData::take_serialized_message( std::unique_ptr msg_data = std::move(message_queue_.front()); message_queue_.pop_front(); - const uint8_t * payload = z_slice_data(z_loan(msg_data->payload)); - const size_t payload_len = z_slice_len(z_loan(msg_data->payload)); + const uint8_t * payload = msg_data->payload.data(); + const size_t payload_len = msg_data->payload.size(); if (serialized_message->buffer_capacity < payload_len) { rmw_ret_t ret = diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 08a09def..a44758ab 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -35,6 +35,7 @@ #include "attachment_helpers.hpp" #include "type_support_common.hpp" #include "zenoh_utils.hpp" +#include "payload.hpp" #include "rcutils/allocator.h" @@ -50,13 +51,13 @@ class SubscriptionData final : public std::enable_shared_from_this