Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create DDS Reader only on remote Subscription #5

Merged
merged 3 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 2 additions & 154 deletions zenoh-plugin-ros2dds/src/dds_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task;
use cyclors::qos::{History, HistoryKind, Qos};
use cyclors::qos::Qos;
use cyclors::*;
use flume::Sender;
use serde::{Deserialize, Serialize};
Expand All @@ -21,14 +20,8 @@ use std::fmt;
use std::mem::MaybeUninit;
use std::os::raw;
use std::sync::Arc;
use std::time::Duration;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;
use zenoh::Session;
use zenoh_core::SyncResolve;

use crate::dds_types::{DDSRawSample, TypeInfo};
use crate::dds_utils::create_topic;
use crate::dds_types::TypeInfo;
use crate::gid::Gid;

const MAX_SAMPLES: usize = 32;
Expand Down Expand Up @@ -295,148 +288,3 @@ pub fn run_discovery(dp: dds_entity_t, tx: Sender<DDSDiscoveryEvent>) {
);
}
}

unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os::raw::c_void) {
let pa = arg as *mut (String, KeyExpr, Arc<Session>, CongestionControl);
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while dds_takecdr(
dr,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
if si[0].valid_data {
let raw_sample = DDSRawSample::create(zp);

if *crate::LOG_PAYLOAD {
log::trace!(
"Route Publisher (DDS:{} -> Zenoh:{}) - routing data - payload: {:02x?}",
&(*pa).0,
&(*pa).1,
raw_sample
);
} else {
log::trace!(
"Route Publisher (DDS:{} -> Zenoh:{}) - routing data - {} bytes",
&(*pa).0,
&(*pa).1,
raw_sample.len()
);
}
let _ = (*pa)
.2
.put(&(*pa).1, &raw_sample)
.congestion_control((*pa).3)
.res_sync();
}
ddsi_serdata_unref(zp);
}
}

#[allow(clippy::too_many_arguments)]
pub fn create_forwarding_dds_reader(
dp: dds_entity_t,
topic_name: String,
type_name: String,
type_info: &Option<Arc<TypeInfo>>,
keyless: bool,
mut qos: Qos,
z_key: KeyExpr,
z: Arc<Session>,
read_period: Option<Duration>,
congestion_ctrl: CongestionControl,
) -> Result<dds_entity_t, String> {
unsafe {
let t = create_topic(dp, &topic_name, &type_name, type_info, keyless);

match read_period {
None => {
// Use a Listener to route data as soon as it arrives
let arg = Box::new((topic_name, z_key, z, congestion_ctrl));
let sub_listener =
dds_create_listener(Box::into_raw(arg) as *mut std::os::raw::c_void);
dds_lset_data_available(sub_listener, Some(data_forwarder_listener));
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(dp, t, qos_native, sub_listener);
Qos::delete_qos_native(qos_native);
if reader >= 0 {
let res = dds_reader_wait_for_historical_data(reader, qos::DDS_100MS_DURATION);
if res < 0 {
log::error!(
"Error calling dds_reader_wait_for_historical_data(): {}",
CStr::from_ptr(dds_strretcode(-res))
.to_str()
.unwrap_or("unrecoverable DDS retcode")
);
}
Ok(reader)
} else {
Err(format!(
"Error creating DDS Reader: {}",
CStr::from_ptr(dds_strretcode(-reader))
.to_str()
.unwrap_or("unrecoverable DDS retcode")
))
}
}
Some(period) => {
// Use a periodic task that takes data to route from a Reader with KEEP_LAST 1
qos.history = Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 1,
});
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(dp, t, qos_native, std::ptr::null());
let z_key = z_key.into_owned();
task::spawn(async move {
// loop while reader's instance handle remain the same
// (if reader was deleted, its dds_entity_t value might have been
// reused by a new entity... don't trust it! Only trust instance handle)
let mut original_handle: dds_instance_handle_t = 0;
dds_get_instance_handle(reader, &mut original_handle);
let mut handle: dds_instance_handle_t = 0;
while dds_get_instance_handle(reader, &mut handle) == DDS_RETCODE_OK as i32 {
if handle != original_handle {
break;
}

async_std::task::sleep(period).await;
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
#[allow(clippy::uninit_assumed_init)]
let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit();
while dds_takecdr(
reader,
&mut zp,
1,
si.as_mut_ptr() as *mut dds_sample_info_t,
DDS_ANY_STATE,
) > 0
{
let si = si.assume_init();
if si[0].valid_data {
log::trace!(
"Route (periodic) data to zenoh resource with rid={}",
z_key
);

let raw_sample = DDSRawSample::create(zp);

let _ = z
.put(&z_key, &raw_sample)
.congestion_control(congestion_ctrl)
.res_sync();
}
ddsi_serdata_unref(zp);
}
}
});
Ok(reader)
}
}
}
}
21 changes: 20 additions & 1 deletion zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::Serializer;
use std::{
ffi::{CStr, CString},
mem::MaybeUninit,
sync::Arc,
sync::{atomic::AtomicI32, Arc},
time::Duration,
};
#[cfg(feature = "dds_shm")]
Expand All @@ -32,6 +32,11 @@ use crate::{
vec_into_raw_parts,
};

pub const DDS_ENTITY_NULL: dds_entity_t = 0;

// An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities
pub type AtomicDDSEntity = AtomicI32;

pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> {
unsafe {
let r = dds_delete(entity);
Expand Down Expand Up @@ -64,6 +69,20 @@ where
}
}

pub fn serialize_atomic_entity_guid<S>(entity: &AtomicDDSEntity, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
println!(
"--- serialize_atomic_entity_guid: {}",
entity.load(std::sync::atomic::Ordering::Relaxed)
);
match entity.load(std::sync::atomic::Ordering::Relaxed) {
DDS_ENTITY_NULL => s.serialize_str(""),
entity => serialize_entity_guid(&entity, s),
}
}

pub fn get_instance_handle(entity: dds_entity_t) -> Result<dds_instance_handle_t, String> {
unsafe {
let mut handle: dds_instance_handle_t = 0;
Expand Down
1 change: 0 additions & 1 deletion zenoh-plugin-ros2dds/src/liveliness_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ pub(crate) fn parse_ke_liveliness_service_cli(
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}

/////////
pub(crate) fn new_ke_liveliness_action_srv(
plugin_id: &keyexpr,
zenoh_key_expr: &keyexpr,
Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros2dds/src/qos_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub fn get_durability_service_or_default(qos: &Qos) -> DurabilityService {
}
}

pub fn is_reader_reliable(reliability: &Option<Reliability>) -> bool {
reliability.as_ref().map_or(false, |reliability| {
pub fn is_reliable(qos: &Qos) -> bool {
qos.reliability.as_ref().map_or(false, |reliability| {
reliability.kind == ReliabilityKind::RELIABLE
})
}
Expand Down
52 changes: 0 additions & 52 deletions zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn add_dds_writers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let writer_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.writer_gid_seq;
for gid in gids {
writer_gid_seq.insert(gid);
}
*has_changed = true;
}

pub fn remove_dds_writer(&self, gid: Gid) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
info.node_entities_info_seq
Expand All @@ -235,19 +222,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn remove_dds_writers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let writer_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.writer_gid_seq;
for gid in gids {
writer_gid_seq.remove(&gid);
}
*has_changed = true;
}

pub fn add_dds_reader(&self, gid: Gid) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
info.node_entities_info_seq
Expand All @@ -258,19 +232,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn add_dds_readers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let reader_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.reader_gid_seq;
for gid in gids {
reader_gid_seq.insert(gid);
}
*has_changed = true;
}

pub fn remove_dds_reader(&self, gid: Gid) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
info.node_entities_info_seq
Expand All @@ -281,19 +242,6 @@ impl RosDiscoveryInfoMgr {
*has_changed = true;
}

pub fn remove_dds_readers(&self, gids: Vec<Gid>) {
let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state);
let reader_gid_seq = &mut info
.node_entities_info_seq
.get_mut(&self.node_fullname)
.unwrap()
.reader_gid_seq;
for gid in gids {
reader_gid_seq.remove(&gid);
}
*has_changed = true;
}

pub fn read(&self) -> Vec<ParticipantEntitiesInfo> {
unsafe {
let mut zp: *mut ddsi_serdata = std::ptr::null_mut();
Expand Down
Loading
Loading