Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracepoint for publish/subscribe serialized_message #2

Open
wants to merge 5 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ However, `rmw_fastrtps` offers the possibility to further configure Fast DDS:
* [Full QoS configuration](#full-qos-configuration)
* [Change participant discovery options](#change-participant-discovery-options)
* [Enable Zero Copy Data Sharing](#enable-zero-copy-data-sharing)
* [Large data transfer over lossy network](#large-data-transfer-over-lossy-network)

### Change publication mode

Expand Down Expand Up @@ -306,6 +307,26 @@ In order to achieve a Zero Copy message delivery, applications need to both enab
</profiles>
```

### Large data transfer over lossy network

Out of the box, Fast DDS uses UDPv4 for the data communication over the network.
Although UDP has its own merit for realtime communications, with many applications relying on UDP, depending on application requirements, a more reliable network transmission may be needed.
Such cases included but are not limited to sending large data samples over lossy networks, where TCP's builtin reliability and flow control tend to perform better.

Because of this reason, Fast DDS provides the possibility to modify its builtin transports via an environmental variable `FASTDDS_BUILTIN_TRANSPORTS`, allowing for easily changing the transport layer to TCP when needed:

```bash
export FASTDDS_BUILTIN_TRANSPORTS=LARGE_DATA
```

This `LARGE_DATA` mode adds a TCP transport for data communication, restricting the use of the UDP transport to the first part of the discovery process, thus achieving a reliable transmission with automatic discovery capabilities.
This will improve the transmission of large data samples over lossy networks.

> [!NOTE]
> The environmental variable needs to be set on both publisher and subscription side.

For more information, please refer to [FASTDDS_BUILTIN_TRANSPORTS](https://fast-dds.docs.eprosima.com/en/latest/fastdds/env_vars/env_vars.html#fastdds-builtin-transports).

## Quality Declaration files

Quality Declarations for each package in this repository:
Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ find_package(rmw_fastrtps_shared_cpp REQUIRED)
find_package(tracetools REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps 2.10 REQUIRED CONFIG)
find_package(FastRTPS 2.10 REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

find_package(rmw REQUIRED)
find_package(rosidl_dynamic_typesupport REQUIRED)
Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_cpp/rmw_fastrtps_cpp-extras.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# copied from rmw_fastrtps_cpp/rmw_fastrtps_cpp-extras.cmake

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps REQUIRED CONFIG)
find_package(FastRTPS REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

list(APPEND rmw_fastrtps_cpp_INCLUDE_DIRS ${FastRTPS_INCLUDE_DIR})
# specific order: dependents before dependencies
Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ rmw_serialize(
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), data_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);

auto ret = tss.serializeROSmessage(ros_message, ser, callbacks);
serialized_message->buffer_length = data_length;
Expand Down Expand Up @@ -81,8 +82,7 @@ rmw_deserialize(
auto tss = MessageTypeSupport_cpp(callbacks);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);

auto ret = tss.deserializeROSmessage(deser, ros_message, callbacks);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_dynamic_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ find_package(rmw_dds_common REQUIRED)
find_package(rmw_fastrtps_shared_cpp REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps 2.10 REQUIRED CONFIG)
find_package(FastRTPS 2.10 REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

find_package(rmw REQUIRED)
find_package(rosidl_runtime_c REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# copied from rmw_fastrtps_dynamic_cpp/rmw_fastrtps_dynamic_cpp-extras.cmake

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps REQUIRED CONFIG)
find_package(FastRTPS REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

list(APPEND rmw_fastrtps_dynamic_cpp_INCLUDE_DIRS ${FastRTPS_INCLUDE_DIR})
# specific order: dependents before dependencies
Expand Down
14 changes: 7 additions & 7 deletions rmw_fastrtps_dynamic_cpp/src/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void serialize_field(
if (!member->is_array_) {
ser << *static_cast<T *>(field);
} else if (member->array_size_ && !member->is_upper_bound_) {
ser.serializeArray(static_cast<T *>(field), member->array_size_);
ser.serialize_array(static_cast<T *>(field), member->array_size_);
} else {
std::vector<T> & data = *reinterpret_cast<std::vector<T> *>(field);
ser << data;
Expand Down Expand Up @@ -120,10 +120,10 @@ void serialize_field(
if (!member->is_array_) {
ser << *static_cast<T *>(field);
} else if (member->array_size_ && !member->is_upper_bound_) {
ser.serializeArray(static_cast<T *>(field), member->array_size_);
ser.serialize_array(static_cast<T *>(field), member->array_size_);
} else {
auto & data = *reinterpret_cast<typename GenericCSequence<T>::type *>(field);
ser.serializeSequence(reinterpret_cast<T *>(data.data), data.size);
ser.serialize_sequence(reinterpret_cast<T *>(data.data), data.size);
}
}

Expand Down Expand Up @@ -561,7 +561,7 @@ void deserialize_field(
if (!member->is_array_) {
deser >> *static_cast<T *>(field);
} else if (member->array_size_ && !member->is_upper_bound_) {
deser.deserializeArray(static_cast<T *>(field), member->array_size_);
deser.deserialize_array(static_cast<T *>(field), member->array_size_);
} else {
auto & vector = *reinterpret_cast<std::vector<T> *>(field);
deser >> vector;
Expand All @@ -578,7 +578,7 @@ inline void deserialize_field<std::string>(
deser >> *static_cast<std::string *>(field);
} else if (member->array_size_ && !member->is_upper_bound_) {
std::string * array = static_cast<std::string *>(field);
deser.deserializeArray(array, member->array_size_);
deser.deserialize_array(array, member->array_size_);
} else {
auto & vector = *reinterpret_cast<std::vector<std::string> *>(field);
deser >> vector;
Expand Down Expand Up @@ -618,13 +618,13 @@ void deserialize_field(
if (!member->is_array_) {
deser >> *static_cast<T *>(field);
} else if (member->array_size_ && !member->is_upper_bound_) {
deser.deserializeArray(static_cast<T *>(field), member->array_size_);
deser.deserialize_array(static_cast<T *>(field), member->array_size_);
} else {
auto & data = *reinterpret_cast<typename GenericCSequence<T>::type *>(field);
int32_t dsize = 0;
deser >> dsize;
GenericCSequence<T>::init(&data, dsize);
deser.deserializeArray(reinterpret_cast<T *>(data.data), dsize);
deser.deserialize_array(reinterpret_cast<T *>(data.data), dsize);
}
}

Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ rmw_serialize(
reinterpret_cast<char *>(serialized_message->buffer),
data_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);

auto ret = tss->serializeROSmessage(ros_message, ser, ts->data);
serialized_message->buffer_length = data_length;
Expand Down Expand Up @@ -85,8 +86,7 @@ rmw_deserialize(
auto tss = type_registry.get_message_type_support(ts);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);

auto ret = tss->deserializeROSmessage(deser, ros_message, ts->data);
type_registry.return_message_type_support(ts);
Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ find_package(rosidl_typesupport_introspection_cpp REQUIRED)
find_package(tracetools REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps 2.10 REQUIRED CONFIG)
find_package(FastRTPS 2.10 REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

find_package(rmw REQUIRED)

Expand Down
6 changes: 3 additions & 3 deletions rmw_fastrtps_shared_cpp/rmw_fastrtps_shared_cpp-extras.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# copied from rmw_fastrtps_shared_cpp/rmw_fastrtps_shared_cpp-extras.cmake

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
find_package(fastrtps REQUIRED CONFIG)
find_package(FastRTPS REQUIRED MODULE)
find_package(fastcdr 2 REQUIRED CONFIG)
find_package(fastrtps 2.13 REQUIRED CONFIG)
find_package(FastRTPS 2.13 REQUIRED MODULE)

list(APPEND rmw_fastrtps_shared_cpp_INCLUDE_DIRS ${FastRTPS_INCLUDE_DIR})
# specific order: dependents before dependencies
Expand Down
26 changes: 15 additions & 11 deletions rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ bool TypeSupport::serialize(
eprosima::fastcdr::FastBuffer fastbuffer( // Object that manages the raw buffer
reinterpret_cast<char *>(payload->data), payload->max_size);
eprosima::fastcdr::Cdr ser( // Object that serializes the data
fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);
if (this->serializeROSmessage(ser_data->data, ser, ser_data->impl)) {
payload->encapsulation = ser.endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
payload->length = (uint32_t)ser.getSerializedDataLength();
payload->length = (uint32_t)ser.get_serialized_data_length();
return true;
}
break;
Expand All @@ -91,11 +93,11 @@ bool TypeSupport::serialize(
case FASTRTPS_SERIALIZED_DATA_TYPE_CDR_BUFFER:
{
auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
if (payload->max_size >= ser->getSerializedDataLength()) {
payload->length = static_cast<uint32_t>(ser->getSerializedDataLength());
if (payload->max_size >= ser->get_serialized_data_length()) {
payload->length = static_cast<uint32_t>(ser->get_serialized_data_length());
payload->encapsulation = ser->endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength());
memcpy(payload->data, ser->get_buffer_pointer(), ser->get_serialized_data_length());
return true;
}
break;
Expand Down Expand Up @@ -132,7 +134,7 @@ bool TypeSupport::deserialize(
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(payload->data), payload->length);
eprosima::fastcdr::Cdr deser(
fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);
return deserializeROSmessage(deser, ser_data->data, ser_data->impl);
}

Expand Down Expand Up @@ -171,7 +173,7 @@ std::function<uint32_t()> TypeSupport::getSerializedSizeProvider(void * data)
{
if (ser_data->type == FASTRTPS_SERIALIZED_DATA_TYPE_CDR_BUFFER) {
auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
return static_cast<uint32_t>(ser->getSerializedDataLength());
return static_cast<uint32_t>(ser->get_serialized_data_length());
}
return static_cast<uint32_t>(
this->getEstimatedSerializedSize(ser_data->data, ser_data->impl));
Expand Down Expand Up @@ -301,12 +303,13 @@ const TypeObject * GetCompleteObject(
// DDS document)
eprosima::fastcdr::Cdr ser(
fastbuffer, eprosima::fastcdr::Cdr::LITTLE_ENDIANNESS,
eprosima::fastcdr::Cdr::DDS_CDR); // Object that serializes the data.
eprosima::fastcdr::CdrVersion::XCDRv1); // Object that serializes the data.
ser.set_encoding_flag(eprosima::fastcdr::PLAIN_CDR);
payload.encapsulation = CDR_LE;

type_object->serialize(ser);
payload.length =
static_cast<uint32_t>(ser.getSerializedDataLength()); // Get the serialized length
static_cast<uint32_t>(ser.get_serialized_data_length()); // Get the serialized length
MD5 objectHash;
objectHash.update(reinterpret_cast<char *>(payload.data), payload.length);
objectHash.finalize();
Expand Down Expand Up @@ -377,12 +380,13 @@ const TypeObject * GetMinimalObject(
// DDS document)
eprosima::fastcdr::Cdr ser(
fastbuffer, eprosima::fastcdr::Cdr::LITTLE_ENDIANNESS,
eprosima::fastcdr::Cdr::DDS_CDR); // Object that serializes the data.
eprosima::fastcdr::CdrVersion::XCDRv1); // Object that serializes the data.
ser.set_encoding_flag(eprosima::fastcdr::PLAIN_CDR);
payload.encapsulation = CDR_LE;

type_object->serialize(ser);
payload.length =
static_cast<uint32_t>(ser.getSerializedDataLength()); // Get the serialized length
static_cast<uint32_t>(ser.get_serialized_data_length()); // Get the serialized length
MD5 objectHash;
objectHash.update(reinterpret_cast<char *>(payload.data), payload.length);
objectHash.finalize();
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ __rmw_publish_serialized_message(
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);
if (!ser.jump(serialized_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
return RMW_RET_ERROR;
Expand All @@ -108,7 +109,10 @@ __rmw_publish_serialized_message(
data.type = FASTRTPS_SERIALIZED_DATA_TYPE_CDR_BUFFER;
data.data = &ser;
data.impl = nullptr; // not used when type is FASTRTPS_SERIALIZED_DATA_TYPE_CDR_BUFFER
if (!info->data_writer_->write(&data)) {
eprosima::fastrtps::Time_t stamp;
eprosima::fastrtps::Time_t::now(stamp);
TRACETOOLS_TRACEPOINT(rmw_publish, publisher, serialized_message, stamp.to_ns());
if (!info->data_writer_->write_w_timestamp(&data, eprosima::fastdds::dds::HANDLE_NIL, stamp)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ __rmw_take_request(

auto raw_type_support = dynamic_cast<rmw_fastrtps_shared_cpp::TypeSupport *>(
info->response_type_support_.get());
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);
if (raw_type_support->deserializeROSmessage(
deser, ros_request, info->request_type_support_impl_))
{
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ __rmw_take_response(
info->response_type_support_.get());
eprosima::fastcdr::Cdr deser(
*response.buffer_,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);
if (raw_type_support->deserializeROSmessage(
deser, ros_response, info->response_type_support_impl_))
{
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ _take_serialized_message(
break;
}
}

TRACETOOLS_TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
}

Expand Down