From 19609f03cafe9ffd864e8d09a57c3fc073338d07 Mon Sep 17 00:00:00 2001 From: Markus Weber Date: Sun, 29 Oct 2023 20:05:44 +0000 Subject: [PATCH 1/2] first try to upgrade dds_writer dds --- zenoh-plugin-dds/src/lib.rs | 8 +++++++- zenoh-plugin-dds/src/route_zenoh_dds.rs | 17 ++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/zenoh-plugin-dds/src/lib.rs b/zenoh-plugin-dds/src/lib.rs index 705786e8..eed7b5dd 100644 --- a/zenoh-plugin-dds/src/lib.rs +++ b/zenoh-plugin-dds/src/lib.rs @@ -530,7 +530,6 @@ impl<'a> DdsPluginRuntime<'a> { } if let Some(route) = self.routes_to_dds.get(&ke) { - // TODO: check if there is no type or QoS conflict with existing route debug!( "Route from resource {} to DDS already exists -- ignoring", ke @@ -539,6 +538,13 @@ impl<'a> DdsPluginRuntime<'a> { // (just to declare the Zenoh Subscriber). Thus, try to set a DDS Writer to the route here. // If already set, nothing will happen. if let Some(qos) = writer_qos { + if let Some(exiting_qos) = route.get_qos() { + if qos.clone().reliability.unwrap().kind as isize > exiting_qos.reliability.unwrap().kind as isize { + // new reliability is higher than existing one. Destroy and create new reader + route.delete_dds_writer(); + // and then continue to create the route again + } + } if let Err(e) = route.set_dds_writer(self.dp, qos) { error!( "{}: failed to set a DDS Writer after creation: {}", diff --git a/zenoh-plugin-dds/src/route_zenoh_dds.rs b/zenoh-plugin-dds/src/route_zenoh_dds.rs index 2f5641f5..57360ddc 100644 --- a/zenoh-plugin-dds/src/route_zenoh_dds.rs +++ b/zenoh-plugin-dds/src/route_zenoh_dds.rs @@ -13,7 +13,7 @@ // use cyclors::{ - dds_entity_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, + dds_entity_t, dds_qos_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, dds_get_qos, ddsi_serdata_kind_SDK_DATA, ddsi_sertype, ddsrt_iovec_t, }; use serde::{Serialize, Serializer}; @@ -352,6 +352,21 @@ impl RouteZenohDDS<'_> { pub(crate) fn has_local_routed_reader(&self) -> bool { !self.local_routed_readers.is_empty() } + + pub(crate) fn get_qos(&self) -> Option { + let qos_ptr: *mut dds_qos_t = std::ptr::null_mut(); + + unsafe { + dds_get_qos(self.dds_writer.load(Ordering::Acquire), qos_ptr); + + if qos_ptr.is_null() { + return Option::None; + } else { + return Option::Some(Qos::from_qos_native(qos_ptr)); + } + } + } + } fn do_route_data(s: Sample, topic_name: &str, data_writer: dds_entity_t) { From 20b9c4a79c1ae29e993e136f1c851abc6ef798a9 Mon Sep 17 00:00:00 2001 From: Markus Weber Date: Sun, 29 Oct 2023 22:30:12 +0000 Subject: [PATCH 2/2] fix incompatible QoS for 2 different subs * adapt allways higher QoS reliability for DDS writer --- zenoh-plugin-dds/src/lib.rs | 43 +++++++++++++++++-------- zenoh-plugin-dds/src/route_zenoh_dds.rs | 21 ++++++------ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/zenoh-plugin-dds/src/lib.rs b/zenoh-plugin-dds/src/lib.rs index eed7b5dd..57ef9db4 100644 --- a/zenoh-plugin-dds/src/lib.rs +++ b/zenoh-plugin-dds/src/lib.rs @@ -537,21 +537,38 @@ impl<'a> DdsPluginRuntime<'a> { // #102: in forwarding mode, it might happen that the route have been created but without DDS Writer // (just to declare the Zenoh Subscriber). Thus, try to set a DDS Writer to the route here. // If already set, nothing will happen. - if let Some(qos) = writer_qos { - if let Some(exiting_qos) = route.get_qos() { - if qos.clone().reliability.unwrap().kind as isize > exiting_qos.reliability.unwrap().kind as isize { - // new reliability is higher than existing one. Destroy and create new reader + match (writer_qos.clone(), route.get_qos()) { + (Some(new_qos), Some(exiting_qos)) => { + info!("Route Zenoh->DDS ({} -> {}): Upgrading QoS {}->{}", + ke, topic_name, + exiting_qos.clone().reliability.unwrap().kind as isize, + new_qos.clone().reliability.unwrap().kind as isize + ); + + // new reliability is higher than existing one. Destroy existing writer before creating new one + if new_qos.clone().reliability.unwrap().kind as isize > exiting_qos.reliability.unwrap().kind as isize { route.delete_dds_writer(); - // and then continue to create the route again + // route.set_qos(new_qos); + if let Err(e) = route.set_dds_writer(self.dp, new_qos) { + error!( + "{}: failed to set a DDS Writer after creation: {}", + route, e + ); + return RouteStatus::CreationFailure(e); + } } - } - if let Err(e) = route.set_dds_writer(self.dp, qos) { - error!( - "{}: failed to set a DDS Writer after creation: {}", - route, e - ); - return RouteStatus::CreationFailure(e); - } + }, + (Some(new_qos), None) => { + // no writer existing yet. Only create new one + if let Err(e) = route.set_dds_writer(self.dp, new_qos) { + error!( + "{}: failed to set a DDS Writer after creation: {}", + route, e + ); + return RouteStatus::CreationFailure(e); + } + }, + _ => { } // no need to delete writer in any case if no QoS exists already } return RouteStatus::Routed(ke); } diff --git a/zenoh-plugin-dds/src/route_zenoh_dds.rs b/zenoh-plugin-dds/src/route_zenoh_dds.rs index 57360ddc..97f4ea1b 100644 --- a/zenoh-plugin-dds/src/route_zenoh_dds.rs +++ b/zenoh-plugin-dds/src/route_zenoh_dds.rs @@ -13,7 +13,7 @@ // use cyclors::{ - dds_entity_t, dds_qos_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, dds_get_qos, + dds_entity_t, dds_get_entity_sertype, dds_strretcode, dds_writecdr, ddsi_serdata_from_ser_iov, dds_get_qos, dds_create_qos, ddsi_serdata_kind_SDK_DATA, ddsi_sertype, ddsrt_iovec_t, }; use serde::{Serialize, Serializer}; @@ -354,19 +354,22 @@ impl RouteZenohDDS<'_> { } pub(crate) fn get_qos(&self) -> Option { - let qos_ptr: *mut dds_qos_t = std::ptr::null_mut(); - unsafe { - dds_get_qos(self.dds_writer.load(Ordering::Acquire), qos_ptr); - - if qos_ptr.is_null() { - return Option::None; + let qos = dds_create_qos(); + let ret = dds_get_qos(self.dds_writer.load(Ordering::Relaxed), qos); + if ret == 0 { + return Option::Some(Qos::from_qos_native(qos)); } else { - return Option::Some(Qos::from_qos_native(qos_ptr)); + log::warn!( + "Retrieving QoS failed: {}", + CStr::from_ptr(dds_strretcode(ret)) + .to_str() + .unwrap_or("unrecoverable DDS retcode") + ); + return Option::None; } } } - } fn do_route_data(s: Sample, topic_name: &str, data_writer: dds_entity_t) {