Skip to content

Commit

Permalink
Add tracing instrumentation using tracetools (#294)
Browse files Browse the repository at this point in the history
* Add tracing instrumentation using tracetools

* Remove the create_map_and_set_sequence_num function.

With the recent changes to get data out of it, and the change
to zenoh-cpp, it can mostly be replaced with a couple of
inline pieces of code.  However, we do add in a new function
to help us explicitly get system clock time from std::chrono
in nanoseconds.

Signed-off-by: Christophe Bedard <[email protected]>
Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
christophebedard authored Dec 20, 2024
1 parent 65e0cab commit 79fc5ce
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 32 deletions.
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_fastrtps_c REQUIRED)
find_package(rosidl_typesupport_fastrtps_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(tracetools REQUIRED)
find_package(zenoh_cpp_vendor REQUIRED)

add_library(rmw_zenoh_cpp SHARED
Expand Down Expand Up @@ -67,6 +68,7 @@ target_link_libraries(rmw_zenoh_cpp
rosidl_typesupport_fastrtps_c::rosidl_typesupport_fastrtps_c
rosidl_typesupport_fastrtps_cpp::rosidl_typesupport_fastrtps_cpp
rmw::rmw
tracetools::tracetools
zenohcxx::zenohc
)

Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<depend>rosidl_typesupport_fastrtps_c</depend>
<depend>rosidl_typesupport_fastrtps_cpp</depend>
<depend>rmw</depend>
<depend>tracetools</depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
Expand Down
13 changes: 11 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
Expand Down Expand Up @@ -363,10 +365,17 @@ rmw_ret_t ClientData::send_request(
size_t data_length = ser.get_serialized_data_length();
*sequence_id = sequence_number_++;

TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(rmw_client_),
static_cast<const void *>(ros_request),
*sequence_id);

// Send request
zenoh::Session::GetOptions opts = zenoh::Session::GetOptions::create_default();
std::array<uint8_t, RMW_GID_STORAGE_SIZE> local_gid = entity_->copy_gid();
opts.attachment = rmw_zenoh_cpp::create_map_and_set_sequence_num(*sequence_id, local_gid);
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
opts.attachment = rmw_zenoh_cpp::AttachmentData(
*sequence_id, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();
opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
// The default timeout for a z_get query is 10 seconds and if a response is not received within
// this window, the queryable will return an invalid reply. However, it is common for actions,
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
mutable std::recursive_mutex mutex_;
// The parent node.
const rmw_node_t * rmw_node_;
// The rmw client.
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ bool NodeData::create_pub_data(

auto pub_data = PublisherData::make(
session,
publisher,
node_,
entity_->node_info(),
id_,
Expand Down Expand Up @@ -276,6 +277,7 @@ bool NodeData::create_service_data(
auto service_data = ServiceData::make(
session,
node_,
service,
entity_->node_info(),
id_,
std::move(id),
Expand Down
22 changes: 17 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
// TODO(yuyuan): SHM, make this configurable
Expand All @@ -45,6 +47,7 @@ namespace rmw_zenoh_cpp
///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
std::shared_ptr<zenoh::Session> session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -158,6 +161,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

return std::shared_ptr<PublisherData>(
new PublisherData{
rmw_publisher,
node,
std::move(entity),
std::move(session),
Expand All @@ -171,6 +175,7 @@ std::shared_ptr<PublisherData> PublisherData::make(

///=============================================================================
PublisherData::PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> sess,
Expand All @@ -179,7 +184,8 @@ PublisherData::PublisherData(
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
: rmw_node_(rmw_node),
: rmw_publisher_(rmw_publisher),
rmw_node_(rmw_node),
entity_(std::move(entity)),
sess_(std::move(sess)),
pub_(std::move(pub)),
Expand Down Expand Up @@ -246,17 +252,19 @@ rmw_ret_t PublisherData::publish(
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(
sequence_number_++,
entity_->copy_gid());
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
Expand Down Expand Up @@ -292,14 +300,18 @@ rmw_ret_t PublisherData::publish_serialized_message(
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
zenoh::ZResult result;
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, entity_->copy_gid());
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class PublisherData final
// Make a shared_ptr of PublisherData.
static std::shared_ptr<PublisherData> make(
std::shared_ptr<zenoh::Session> session,
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * const node,
liveliness::NodeInfo node_info,
std::size_t node_id,
Expand Down Expand Up @@ -90,6 +91,7 @@ class PublisherData final
private:
// Constructor.
PublisherData(
const rmw_publisher_t * const rmw_publisher,
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
Expand All @@ -101,6 +103,8 @@ class PublisherData final

// Internal mutex.
mutable std::mutex mutex_;
// The rmw publisher
const rmw_publisher_t * rmw_publisher_;
// The parent node.
const rmw_node_t * rmw_node_;
// The Entity generated for the publisher.
Expand Down
19 changes: 16 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

#include "tracetools/tracetools.h"

namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ServiceData> ServiceData::make(
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_service_t * rmw_service,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
Expand Down Expand Up @@ -128,6 +131,7 @@ std::shared_ptr<ServiceData> ServiceData::make(
auto service_data = std::shared_ptr<ServiceData>(
new ServiceData{
node,
rmw_service,
std::move(entity),
session,
request_members,
Expand Down Expand Up @@ -192,13 +196,15 @@ std::shared_ptr<ServiceData> ServiceData::make(
///=============================================================================
ServiceData::ServiceData(
const rmw_node_t * rmw_node,
const rmw_service_t * rmw_service,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::unique_ptr<RequestTypeSupport> request_type_support,
std::unique_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_service_(rmw_service),
entity_(std::move(entity)),
sess_(std::move(session)),
request_type_support_impl_(request_type_support_impl),
Expand Down Expand Up @@ -438,9 +444,9 @@ rmw_ret_t ServiceData::send_response(
zenoh::Query::ReplyOptions options = zenoh::Query::ReplyOptions::create_default();
std::array<uint8_t, RMW_GID_STORAGE_SIZE> writer_gid;
memcpy(writer_gid.data(), request_id->writer_guid, RMW_GID_STORAGE_SIZE);
options.attachment = create_map_and_set_sequence_num(
request_id->sequence_number,
writer_gid);
int64_t source_timestamp = rmw_zenoh_cpp::get_system_time_in_ns();
options.attachment = rmw_zenoh_cpp::AttachmentData(
request_id->sequence_number, source_timestamp, writer_gid).serialize_to_zbytes();

std::vector<uint8_t> raw_bytes(
reinterpret_cast<const uint8_t *>(response_bytes),
Expand All @@ -454,6 +460,13 @@ rmw_ret_t ServiceData::send_response(
return RMW_RET_ERROR;
}

TRACETOOLS_TRACEPOINT(
rmw_send_response,
static_cast<const void *>(rmw_service_),
static_cast<const void *>(ros_response),
request_id->writer_guid,
request_id->sequence_number,
source_timestamp);
loaned_query.reply(service_ke, std::move(payload), std::move(options), &result);
if (result != Z_OK) {
RMW_SET_ERROR_MSG("unable to reply");
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_service_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
static std::shared_ptr<ServiceData> make(
std::shared_ptr<zenoh::Session> session,
const rmw_node_t * const node,
const rmw_service_t * rmw_service,
liveliness::NodeInfo node_info,
std::size_t node_id,
std::size_t service_id,
Expand Down Expand Up @@ -98,6 +99,7 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
// Constructor.
ServiceData(
const rmw_node_t * rmw_node,
const rmw_service_t * rmw_service,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * request_type_support_impl,
Expand All @@ -109,6 +111,8 @@ class ServiceData final : public std::enable_shared_from_this<ServiceData>
mutable std::mutex mutex_;
// The parent node.
const rmw_node_t * rmw_node_;
// The rmw service.
const rmw_service_t * rmw_service_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;

Expand Down
19 changes: 7 additions & 12 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid)
{
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
int64_t source_timestamp = now_ns.count();

rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid);
return data.serialize_to_zbytes();
}

///=============================================================================
ZenohQuery::ZenohQuery(
const zenoh::Query & query,
Expand Down Expand Up @@ -82,4 +70,11 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
{
return received_timestamp_;
}

int64_t get_system_time_in_ns()
{
auto now = std::chrono::system_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
}

} // namespace rmw_zenoh_cpp
6 changes: 2 additions & 4 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid);

///=============================================================================
// A class to store the replies to service requests.
class ZenohReply final
Expand Down Expand Up @@ -65,6 +61,8 @@ class ZenohQuery final
zenoh::Query query_;
std::chrono::nanoseconds::rep received_timestamp_;
};

int64_t get_system_time_in_ns();
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_
Loading

0 comments on commit 79fc5ce

Please sign in to comment.