Skip to content

Commit

Permalink
Use new Zenoh Querier
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Dec 9, 2024
1 parent f4990b1 commit 2719ac0
Showing 1 changed file with 24 additions and 22 deletions.
46 changes: 24 additions & 22 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use zenoh::{
internal::buffers::{Buffer, ZBuf, ZSlice},
key_expr::{keyexpr, OwnedKeyExpr},
liveliness::LivelinessToken,
query::Reply,
query::{Querier, Reply},
sample::Locality,
Session, Wait,
Wait,
};

use crate::{
Expand Down Expand Up @@ -66,6 +66,8 @@ pub struct RouteServiceCli {
// the context
#[serde(skip)]
context: Context,
#[serde(skip)]
zenoh_querier: Arc<Querier<'static>>,
#[serde(serialize_with = "crate::config::serialize_duration_as_f32")]
queries_timeout: Duration,
is_active: bool,
Expand Down Expand Up @@ -113,12 +115,25 @@ impl RouteServiceCli {
tracing::debug!(
"Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type} (queries_timeout={queries_timeout:#?})"
);

let zenoh_querier: Arc<Querier<'static>> = Arc::new(
context
.zsession
.declare_querier(zenoh_key_expr.clone())
.congestion_control(zenoh::qos::CongestionControl::Block)
.allowed_destination(Locality::Remote)
.timeout(queries_timeout)
.await
.map_err(|e| format!("Failed create Querier for key {zenoh_key_expr}: {e}",))?,
);

Ok(RouteServiceCli {
ros2_name,
ros2_type,
zenoh_key_expr,
type_info,
context,
zenoh_querier,
queries_timeout,
is_active: false,
rep_writer: Arc::new(DDS_ENTITY_NULL.into()),
Expand Down Expand Up @@ -207,9 +222,7 @@ impl RouteServiceCli {
let route_id: String = self.to_string();
let req_topic_name = format!("rq{}Request", self.ros2_name);
let req_type_name = ros2_service_type_to_request_dds_type(&self.ros2_type);
let zenoh_key_expr2 = self.zenoh_key_expr.clone();
let zsession2 = self.context.zsession.clone();
let queries_timeout = self.queries_timeout;
let zquerier = self.zenoh_querier.clone();
let req_reader = create_dds_reader(
self.context.participant,
req_topic_name,
Expand All @@ -219,14 +232,7 @@ impl RouteServiceCli {
qos,
None,
move |sample| {
route_dds_request_to_zenoh(
&route_id,
sample,
&zenoh_key_expr2,
&zsession2,
queries_timeout,
rep_writer,
);
route_dds_request_to_zenoh(&route_id, sample, &zquerier, rep_writer);
},
)?;
let old = self.req_reader.swap(req_reader, Ordering::Relaxed);
Expand Down Expand Up @@ -345,9 +351,7 @@ impl RouteServiceCli {
fn route_dds_request_to_zenoh(
route_id: &str,
sample: &DDSRawSample,
zenoh_key_expr: &OwnedKeyExpr,
zsession: &Arc<Session>,
query_timeout: Duration,
querier: &Arc<Querier<'static>>,
rep_writer: dds_entity_t,
) {
// Request payload is expected to be the Request type encoded as CDR, including a 4 bytes CDR header,
Expand Down Expand Up @@ -381,20 +385,18 @@ fn route_dds_request_to_zenoh(
zenoh_req_buf.push_zslice(payload);

if *LOG_PAYLOAD {
tracing::debug!("{route_id}: routing request {request_id} from DDS to Zenoh (timeout:{query_timeout:#?}) - payload: {zenoh_req_buf:02x?}");
tracing::debug!("{route_id}: routing request {request_id} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}");
} else {
tracing::trace!(
"{route_id}: routing request {request_id} from DDS to Zenoh (timeout:{query_timeout:#?}) - {} bytes",
"{route_id}: routing request {request_id} from DDS to Zenoh - {} bytes",
zenoh_req_buf.len()
);
}

if let Err(e) = zsession
.get(zenoh_key_expr)
if let Err(e) = querier
.get()
.payload(zenoh_req_buf)
.attachment(request_id.as_attachment())
.allowed_destination(Locality::Remote)
.timeout(query_timeout)
.with({
let route_id1: String = route_id.to_string();
let route_id2 = route_id.to_string();
Expand Down

0 comments on commit 2719ac0

Please sign in to comment.