Skip to content

Commit

Permalink
RoutePublisher: create Reader only on remote announcement
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Oct 6, 2023
1 parent 4498e9d commit 55706ec
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 447 deletions.
156 changes: 2 additions & 154 deletions zenoh-plugin-ros2dds/src/dds_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task;
use cyclors::qos::{History, HistoryKind, Qos};
use cyclors::qos::Qos;
use cyclors::*;
use flume::Sender;
use serde::{Deserialize, Serialize};
Expand All @@ -21,14 +20,8 @@ use std::fmt;
use std::mem::MaybeUninit;
use std::os::raw;
use std::sync::Arc;
use std::time::Duration;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::Session;
use zenoh_core::SyncResolve;

use crate::dds_types::{DDSRawSample, TypeInfo};
use crate::dds_utils::create_topic;
use crate::dds_types::TypeInfo;
use crate::gid::Gid;

const MAX_SAMPLES: usize = 32;
Expand Down Expand Up @@ -295,148 +288,3 @@ pub fn run_discovery(dp: dds_entity_t, tx: Sender<DDSDiscoveryEvent>) {
);
}
}

unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let pa = arg as *mut (String, KeyExpr, Arc<Session>, CongestionControl);
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while dds_takecdr(
dr,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
if si[0].valid_data {
let raw_sample = DDSRawSample::create(zp);

if *crate::LOG_PAYLOAD {
log::trace!(
"Route Publisher (DDS:{} -> Zenoh:{}) - routing data - payload: {:02x?}",
&(*pa).0,
&(*pa).1,
raw_sample
);
} else {
log::trace!(
"Route Publisher (DDS:{} -> Zenoh:{}) - routing data - {} bytes",
&(*pa).0,
&(*pa).1,
raw_sample.len()
);
}
let _ = (*pa)
.2
.put(&(*pa).1, &raw_sample)
.congestion_control((*pa).3)
.res_sync();
}
ddsi_serdata_unref(zp);
}
}

#[allow(clippy::too_many_arguments)]
pub fn create_forwarding_dds_reader(
dp: dds_entity_t,
topic_name: String,
type_name: String,
type_info: &Option<Arc<TypeInfo>>,
keyless: bool,
mut qos: Qos,
z_key: KeyExpr,
z: Arc<Session>,
read_period: Option<Duration>,
congestion_ctrl: CongestionControl,
) -> Result<dds_entity_t, String> {
unsafe {
let t = create_topic(dp, &topic_name, &type_name, type_info, keyless);

match read_period {
None => {
// Use a Listener to route data as soon as it arrives
let arg = Box::new((topic_name, z_key, z, congestion_ctrl));
let sub_listener =
dds_create_listener(Box::into_raw(arg) as *mut std::os::raw::c_void);
dds_lset_data_available(sub_listener, Some(data_forwarder_listener));
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(dp, t, qos_native, sub_listener);
Qos::delete_qos_native(qos_native);
if reader >= 0 {
let res = dds_reader_wait_for_historical_data(reader, qos::DDS_100MS_DURATION);
if res < 0 {
log::error!(
"Error calling dds_reader_wait_for_historical_data(): {}",
CStr::from_ptr(dds_strretcode(-res))
.to_str()
.unwrap_or("unrecoverable DDS retcode")
);
}
Ok(reader)
} else {
Err(format!(
"Error creating DDS Reader: {}",
CStr::from_ptr(dds_strretcode(-reader))
.to_str()
.unwrap_or("unrecoverable DDS retcode")
))
}
}
Some(period) => {
// Use a periodic task that takes data to route from a Reader with KEEP_LAST 1
qos.history = Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 1,
});
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(dp, t, qos_native, std::ptr::null());
let z_key = z_key.into_owned();
task::spawn(async move {
// loop while reader's instance handle remain the same
// (if reader was deleted, its dds_entity_t value might have been
// reused by a new entity... don't trust it! Only trust instance handle)
let mut original_handle: dds_instance_handle_t = 0;
dds_get_instance_handle(reader, &mut original_handle);
let mut handle: dds_instance_handle_t = 0;
while dds_get_instance_handle(reader, &mut handle) == DDS_RETCODE_OK as i32 {
if handle != original_handle {
break;
}

async_std::task::sleep(period).await;
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while dds_takecdr(
reader,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
if si[0].valid_data {
log::trace!(
"Route (periodic) data to zenoh resource with rid={}",
z_key
);

let raw_sample = DDSRawSample::create(zp);

let _ = z
.put(&z_key, &raw_sample)
.congestion_control(congestion_ctrl)
.res_sync();
}
ddsi_serdata_unref(zp);
}
}
});
Ok(reader)
}
}
}
}
17 changes: 6 additions & 11 deletions zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use crate::{
vec_into_raw_parts,
};

const DDS_ENTITY_NULL: dds_entity_t = 0;
pub const DDS_ENTITY_NULL: dds_entity_t = 0;

// An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities
type AtomicDDSEntity = AtomicI32;
pub type AtomicDDSEntity = AtomicI32;

pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> {
unsafe {
Expand All @@ -47,15 +47,6 @@ pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> {
}
}

pub(crate) fn delete_atomic_dds_entity(entity: &mut AtomicDDSEntity) -> Result<(), String> {
let dds_entity = entity.swap(DDS_ENTITY_NULL, std::sync::atomic::Ordering::Relaxed);
if dds_entity != DDS_ENTITY_NULL {
delete_dds_entity(dds_entity)
} else {
Ok(())
}
}

pub fn get_guid(entity: &dds_entity_t) -> Result<Gid, String> {
unsafe {
let mut guid = dds_guid_t { v: [0; 16] };
Expand All @@ -82,6 +73,10 @@ pub fn serialize_atomic_entity_guid<S>(entity: &AtomicDDSEntity, s: S) -> Result
where
S: Serializer,
{
println!(
"--- serialize_atomic_entity_guid: {}",
entity.load(std::sync::atomic::Ordering::Relaxed)
);
match entity.load(std::sync::atomic::Ordering::Relaxed) {
DDS_ENTITY_NULL => s.serialize_str(""),
entity => serialize_entity_guid(&entity, s),
Expand Down
1 change: 0 additions & 1 deletion zenoh-plugin-ros2dds/src/liveliness_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ pub(crate) fn parse_ke_liveliness_service_cli(
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}

/////////
pub(crate) fn new_ke_liveliness_action_srv(
plugin_id: &keyexpr,
zenoh_key_expr: &keyexpr,
Expand Down
26 changes: 0 additions & 26 deletions zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn add_dds_writers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let writer_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.writer_gid_seq;
for gid in gids {
writer_gid_seq.insert(gid);
}
*has_changed = true;
}

pub fn remove_dds_writer(&self, gid: Gid) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
info.node_entities_info_seq
Expand All @@ -245,19 +232,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn add_dds_readers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let reader_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.reader_gid_seq;
for gid in gids {
reader_gid_seq.insert(gid);
}
*has_changed = true;
}

pub fn remove_dds_reader(&self, gid: Gid) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
info.node_entities_info_seq
Expand Down
22 changes: 1 addition & 21 deletions zenoh-plugin-ros2dds/src/route_action_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zenoh::{liveliness::LivelinessToken, prelude::*};
use zenoh_core::AsyncResolve;

use crate::{
gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*,
liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*,
route_action_srv::serialize_action_zenoh_key_expr, route_service_cli::RouteServiceCli,
route_subscriber::RouteSubscriber, routes_mgr::Context,
};
Expand Down Expand Up @@ -171,26 +171,6 @@ impl RouteActionCli<'_> {
self.liveliness_token = None;
}

pub fn dds_writers_guids(&self) -> Result<Vec<Gid>, String> {
Ok([
self.route_send_goal.dds_rep_writer_guid()?,
self.route_cancel_goal.dds_rep_writer_guid()?,
self.route_get_result.dds_rep_writer_guid()?,
self.route_feedback.dds_writer_guid()?,
self.route_status.dds_writer_guid()?,
]
.into())
}

pub fn dds_readers_guids(&self) -> Result<Vec<Gid>, String> {
Ok([
self.route_send_goal.dds_req_reader_guid()?,
self.route_cancel_goal.dds_req_reader_guid()?,
self.route_get_result.dds_req_reader_guid()?,
]
.into())
}

#[inline]
pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) {
self.route_send_goal.add_remote_route(
Expand Down
24 changes: 2 additions & 22 deletions zenoh-plugin-ros2dds/src/route_action_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use zenoh::{liveliness::LivelinessToken, prelude::*};
use zenoh_core::AsyncResolve;

use crate::{
gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*,
route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, routes_mgr::Context,
liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, route_publisher::RoutePublisher,
route_service_srv::RouteServiceSrv, routes_mgr::Context,
};

#[derive(Serialize)]
Expand Down Expand Up @@ -172,26 +172,6 @@ impl RouteActionSrv<'_> {
self.liveliness_token = None;
}

pub fn dds_writers_guids(&self) -> Result<Vec<Gid>, String> {
Ok([
self.route_send_goal.dds_req_writer_guid()?,
self.route_cancel_goal.dds_req_writer_guid()?,
self.route_get_result.dds_req_writer_guid()?,
]
.into())
}

pub fn dds_readers_guids(&self) -> Result<Vec<Gid>, String> {
Ok([
self.route_send_goal.dds_rep_reader_guid()?,
self.route_cancel_goal.dds_rep_reader_guid()?,
self.route_get_result.dds_rep_reader_guid()?,
self.route_feedback.dds_reader_guid()?,
self.route_status.dds_reader_guid()?,
]
.into())
}

#[inline]
pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) {
self.route_send_goal.add_remote_route(
Expand Down
Loading

0 comments on commit 55706ec

Please sign in to comment.