Skip to content

Commit

Permalink
feat(tracing): using tracing and zenoh-util init log
Browse files Browse the repository at this point in the history
Signed-off-by: gabrik <[email protected]>
  • Loading branch information
gabrik committed Apr 3, 2024
1 parent 35bba42 commit d96c903
Show file tree
Hide file tree
Showing 21 changed files with 451 additions and 396 deletions.
289 changes: 173 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,27 @@ cdr = "0.2.4"
clap = "4.4.11"
cyclors = "0.2.0"
derivative = "2.2.0"
env_logger = "0.10.0"
flume = "0.11.0"
futures = "0.3.26"
git-version = "0.3.5"
hex = "0.4.3"
lazy_static = "1.4.0"
log = "0.4.17"
regex = "1.7.1"
rustc_version = "0.4"
serde = "1.0.154"
serde_json = "1.0.94"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
tracing = "0.1"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing", features = [
"unstable",
] }
zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
zenoh-collections = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing" }
zenoh-core = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing" }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing", features = [
"unstable",
] }
zenoh-plugin-rest = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-plugin-trait = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-plugin-rest = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing", default-features = false }
zenoh-plugin-trait = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing", default-features = false }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "feat/tracing", default-features = false }

[profile.release]
debug = false
Expand Down
4 changes: 2 additions & 2 deletions zenoh-bridge-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ dds_shm = ["zenoh-plugin-ros2dds/dds_shm"]
async-std = { workspace = true, features = ["unstable", "attributes"] }
async-liveliness-monitor = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
env_logger = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-plugin-rest = { workspace = true }
zenoh-plugin-trait = { workspace = true }
zenoh-plugin-ros2dds = { version = "0.11.0-dev", path = "../zenoh-plugin-ros2dds/", default-features = false }
zenoh-util = { workspace = true }

[[bin]]
name = "zenoh-bridge-ros2dds"
Expand Down
14 changes: 7 additions & 7 deletions zenoh-bridge-ros2dds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ fn parse_args() -> (Option<f32>, Config) {

#[async_std::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("z=info")).init();
log::info!(
zenoh_util::init_log();
tracing::info!(
"zenoh-bridge-ros2dds {}",
zenoh_plugin_ros2dds::ROS2Plugin::PLUGIN_LONG_VERSION
);
Expand Down Expand Up @@ -106,7 +106,7 @@ fn run_watchdog(period: f32) {
// Start a Liveliness Monitor thread for async_std Runtime
let (_task, monitor) = LivelinessMonitor::start(async_std::task::spawn);
std::thread::spawn(move || {
log::debug!(
tracing::debug!(
"Watchdog started with period {} sec",
sleep_time.as_secs_f32()
);
Expand All @@ -117,7 +117,7 @@ fn run_watchdog(period: f32) {

// Monitor watchdog thread itself
if elapsed > sleep_time + max_sleep_delta {
log::warn!(
tracing::warn!(
"Watchdog thread slept more than configured: {} seconds",
elapsed.as_secs_f32()
);
Expand All @@ -126,11 +126,11 @@ fn run_watchdog(period: f32) {
let report = monitor.latest_report();
if report.elapsed() > report_threshold_1 {
if report.elapsed() > sleep_time {
log::error!("Watchdog detecting async_std is stalled! No task scheduling since {} seconds", report.elapsed().as_secs_f32());
tracing::error!("Watchdog detecting async_std is stalled! No task scheduling since {} seconds", report.elapsed().as_secs_f32());
} else if report.elapsed() > report_threshold_2 {
log::warn!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
tracing::warn!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
} else {
log::info!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
tracing::info!("Watchdog detecting async_std was not scheduling tasks during the last {} ms", report.elapsed().as_micros());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions zenoh-bridge-ros2dds/src/ros_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ impl RosArgs {
for r in &self.remap {
match r.from {
RemapFrom::Namespace => {
log::info!(
tracing::info!(
"Remapping namespace to '{}' as per ROS command line argument",
r.to
);
insert_json5(config, "plugins/ros2dds/namespace", &r.to);
}
RemapFrom::Node => {
log::info!(
tracing::info!(
"Remapping node name to '{}' as per ROS command line argument",
r.to
);
Expand Down
3 changes: 1 addition & 2 deletions zenoh-plugin-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ bincode = { workspace = true }
cdr = { workspace = true }
cyclors = { workspace = true }
derivative = { workspace = true }
env_logger = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
git-version = { workspace = true }
hex = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
zenoh = { workspace = true }
zenoh-collections = { workspace = true }
zenoh-core = { workspace = true }
Expand Down
16 changes: 8 additions & 8 deletions zenoh-plugin-ros2dds/src/dds_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let topic_name = match CStr::from_ptr((*sample).topic_name).to_str() {
Ok(s) => s,
Err(e) => {
log::warn!("Discovery of an invalid topic name: {}", e);
tracing::warn!("Discovery of an invalid topic name: {}", e);
continue;
}
};
if topic_name.starts_with("DCPS") {
log::debug!(
tracing::debug!(
"Ignoring discovery of {} ({} is a builtin topic)",
key,
topic_name
Expand All @@ -124,14 +124,14 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let type_name = match CStr::from_ptr((*sample).type_name).to_str() {
Ok(s) => s,
Err(e) => {
log::warn!("Discovery of an invalid topic type: {}", e);
tracing::warn!("Discovery of an invalid topic type: {}", e);
continue;
}
};
let participant_key = (*sample).participant_key.v.into();
let keyless = (*sample).key.v[15] == 3 || (*sample).key.v[15] == 4;

log::debug!(
tracing::debug!(
"Discovered DDS {} {} from Participant {} on {} with type {} (keyless: {})",
discovery_type,
key,
Expand All @@ -148,15 +148,15 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
0 => match type_info.is_null() {
false => Some(Arc::new(TypeInfo::new(type_info))),
true => {
log::trace!(
tracing::trace!(
"Type information not available for type {}",
type_name
);
None
}
},
_ => {
log::warn!(
tracing::warn!(
"Failed to lookup type information({})",
CStr::from_ptr(dds_strretcode(ret))
.to_str()
Expand Down Expand Up @@ -215,7 +215,7 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
}

if is_alive {
log::debug!("Discovered DDS Participant {})", key,);
tracing::debug!("Discovered DDS Participant {})", key,);

// Send a DDSDiscoveryEvent
let entity = DdsParticipant {
Expand All @@ -242,7 +242,7 @@ unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {

fn send_discovery_event(sender: &Sender<DDSDiscoveryEvent>, event: DDSDiscoveryEvent) {
if let Err(e) = sender.try_send(event) {
log::error!(
tracing::error!(
"INTERNAL ERROR sending DDSDiscoveryEvent to internal channel: {:?}",
e
);
Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where
if reader >= 0 {
let res = dds_reader_wait_for_historical_data(reader, qos::DDS_100MS_DURATION);
if res < 0 {
log::error!(
tracing::error!(
"Error calling dds_reader_wait_for_historical_data(): {}",
CStr::from_ptr(dds_strretcode(-res))
.to_str()
Expand Down
28 changes: 14 additions & 14 deletions zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl DiscoveredEntities {
// Remove associated NodeInfos
if let Some(nodes) = self.nodes_info.remove(gid) {
for (name, mut node) in nodes {
log::info!("Undiscovered ROS Node {}", name);
tracing::info!("Undiscovered ROS Node {}", name);
self.admin_space.remove(
&zenoh::keformat!(ke_admin_node::formatter(), node_id = node.id_as_keyexpr(),)
.unwrap(),
Expand Down Expand Up @@ -267,7 +267,7 @@ impl DiscoveredEntities {
// Remove nodes that are no longer present in ParticipantEntitiesInfo
nodes_map.retain(|name, node| {
if !ros_info.node_entities_info_seq.contains_key(name) {
log::info!("Undiscovered ROS Node {}", name);
tracing::info!("Undiscovered ROS Node {}", name);
admin_space.remove(
&zenoh::keformat!(ke_admin_node::formatter(), node_id = node.id_as_keyexpr(),)
.unwrap(),
Expand All @@ -284,7 +284,7 @@ impl DiscoveredEntities {
for (name, ros_node_info) in &ros_info.node_entities_info_seq {
// If node was not yet discovered, add a new NodeInfo
if !nodes_map.contains_key(name) {
log::info!("Discovered ROS Node {}", name);
tracing::info!("Discovered ROS Node {}", name);
match NodeInfo::create(
ros_node_info.node_namespace.clone(),
ros_node_info.node_name.clone(),
Expand All @@ -302,7 +302,7 @@ impl DiscoveredEntities {
nodes_map.insert(node.fullname().to_string(), node);
}
Err(e) => {
log::warn!("ROS Node has incompatible name: {e}");
tracing::warn!("ROS Node has incompatible name: {e}");
break;
}
}
Expand Down Expand Up @@ -333,19 +333,19 @@ impl DiscoveredEntities {
// For each declared Reader
for rgid in &ros_node_info.reader_gid_seq {
if let Some(entity) = readers.get(rgid) {
log::trace!(
tracing::trace!(
"ROS Node {ros_node_info} declares a Reader on {}",
entity.topic_name
);
if let Some(e) = node.update_with_reader(entity) {
log::debug!(
tracing::debug!(
"ROS Node {ros_node_info} declares a new Reader on {}",
entity.topic_name
);
events.push(e)
};
} else {
log::debug!(
tracing::debug!(
"ROS Node {ros_node_info} declares a not yet discovered DDS Reader: {rgid}"
);
node.undiscovered_reader.push(*rgid);
Expand All @@ -354,19 +354,19 @@ impl DiscoveredEntities {
// For each declared Writer
for wgid in &ros_node_info.writer_gid_seq {
if let Some(entity) = writers.get(wgid) {
log::trace!(
tracing::trace!(
"ROS Node {ros_node_info} declares Writer on {}",
entity.topic_name
);
if let Some(e) = node.update_with_writer(entity) {
log::debug!(
tracing::debug!(
"ROS Node {ros_node_info} declares a new Writer on {}",
entity.topic_name
);
events.push(e)
};
} else {
log::debug!(
tracing::debug!(
"ROS Node {ros_node_info} declares a not yet discovered DDS Writer: {wgid}"
);
node.undiscovered_writer.push(*wgid);
Expand Down Expand Up @@ -414,7 +414,7 @@ impl DiscoveredEntities {
// the selector, if those keys had the admin_keyexpr_prefix.
let sub_kes = selector.key_expr.strip_prefix(admin_keyexpr_prefix);
if sub_kes.is_empty() {
log::error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
tracing::error!("Received query for admin space: '{}' - but it's not prefixed by admin_keyexpr_prefix='{}'", selector, admin_keyexpr_prefix);
return;
}

Expand Down Expand Up @@ -453,12 +453,12 @@ impl DiscoveredEntities {
.res_async()
.await
{
log::warn!("Error replying to admin query {:?}: {}", query, e);
tracing::warn!("Error replying to admin query {:?}: {}", query, e);
}
}
Ok(None) => log::error!("INTERNAL ERROR: Dangling {:?} for {}", entity_ref, key_expr),
Ok(None) => tracing::error!("INTERNAL ERROR: Dangling {:?} for {}", entity_ref, key_expr),
Err(e) => {
log::error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
tracing::error!("INTERNAL ERROR serializing admin value as JSON: {}", e)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions zenoh-plugin-ros2dds/src/discovery_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,39 @@ impl DiscoveryMgr {
let evts = zwrite!(discovered_entities).remove_participant(&key);
for e in evts {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
DDSDiscoveryEvent::DiscoveredPublication{entity} => {
let e = zwrite!(discovered_entities).add_writer(entity);
if let Some(e) = e {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
DDSDiscoveryEvent::UndiscoveredPublication{key} => {
let e = zwrite!(discovered_entities).remove_writer(&key);
if let Some(e) = e {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
DDSDiscoveryEvent::DiscoveredSubscription {entity} => {
let e = zwrite!(discovered_entities).add_reader(entity);
if let Some(e) = e {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
DDSDiscoveryEvent::UndiscoveredSubscription {key} => {
let e = zwrite!(discovered_entities).remove_reader(&key);
if let Some(e) = e {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
Expand All @@ -123,11 +123,11 @@ impl DiscoveryMgr {
_ = ros_disco_timer_rcv.recv_async() => {
let infos = ros_discovery_mgr.read();
for part_info in infos {
log::debug!("Received ros_discovery_info from {}", part_info);
tracing::debug!("Received ros_discovery_info from {}", part_info);
let evts = zwrite!(discovered_entities).update_participant_info(part_info);
for e in evts {
if let Err(err) = evt_sender.try_send(e) {
log::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
}
Expand Down
Loading

0 comments on commit d96c903

Please sign in to comment.