Skip to content

Commit

Permalink
Fix routing loop of Service request when client and server are served…
Browse files Browse the repository at this point in the history
… by the same bridge (#43)
  • Loading branch information
JEnoch committed Jan 22, 2024
1 parent a094d7e commit 6641c70
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
32 changes: 21 additions & 11 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl RouteServiceCli<'_> {
qos,
None,
move |sample| {
do_route_request(
route_dds_request_to_zenoh(
&route_id,
sample,
&zenoh_key_expr2,
Expand Down Expand Up @@ -275,7 +275,7 @@ impl RouteServiceCli<'_> {
}
}

fn do_route_request(
fn route_dds_request_to_zenoh(
route_id: &str,
sample: &DDSRawSample,
zenoh_key_expr: &OwnedKeyExpr,
Expand Down Expand Up @@ -304,10 +304,10 @@ fn do_route_request(
zenoh_req_buf.push_zslice(slice.subslice(20, slice.len()).unwrap());

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing request {request_id:02x?} to Zenoh - payload: {zenoh_req_buf:02x?}");
log::debug!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing request {request_id:02x?} to Zenoh - {} bytes",
"{route_id}: routing request {request_id:02x?} from DDS to Zenoh - {} bytes",
zenoh_req_buf.len()
);
}
Expand All @@ -316,21 +316,29 @@ fn do_route_request(
if let Err(e) = zsession
.get(zenoh_key_expr)
.with_value(zenoh_req_buf)
.allowed_destination(Locality::Remote)
.timeout(queries_timeout)
.callback(move |reply| do_route_reply(route_id2.clone(), reply, request_id, rep_writer))
.callback(move |reply| {
route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer)
})
.res_sync()
{
log::warn!("{route_id}: routing request {request_id:02x?} to Zenoh failed: {e}");
log::warn!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh failed: {e}");
}
}

fn do_route_reply(route_id: String, reply: Reply, request_id: [u8; 16], rep_writer: dds_entity_t) {
fn route_zenoh_reply_to_dds(
route_id: String,
reply: Reply,
request_id: [u8; 16],
rep_writer: dds_entity_t,
) {
match reply.sample {
Ok(sample) => {
let zenoh_rep_buf = sample.payload.contiguous();
if zenoh_rep_buf.len() < 4 || zenoh_rep_buf[1] > 1 {
log::warn!(
"{route_id}: received invalid reply for {request_id:02x?}: {zenoh_rep_buf:0x?}"
"{route_id}: received invalid reply from Zenoh for {request_id:02x?}: {zenoh_rep_buf:0x?}"
);
return;
}
Expand All @@ -344,16 +352,18 @@ fn do_route_reply(route_id: String, reply: Reply, request_id: [u8; 16], rep_writ
dds_rep_buf.extend_from_slice(&zenoh_rep_buf[4..]);

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing reply for {request_id:02x?} to Client - payload: {dds_rep_buf:02x?}");
log::debug!("{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - payload: {dds_rep_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing reply for {request_id:02x?} to Client - {} bytes",
"{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - {} bytes",
dds_rep_buf.len()
);
}

if let Err(e) = dds_write(rep_writer, dds_rep_buf) {
log::warn!("{route_id}: routing reply for {request_id:02x?} failed: {e}");
log::warn!(
"{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS failed: {e}"
);
}
}
Err(val) => {
Expand Down
26 changes: 14 additions & 12 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl RouteServiceSrv<'_> {
let queries_in_progress = queries_in_progress.clone();
let zenoh_key_expr = zenoh_key_expr.clone();
move |sample| {
do_route_reply(
route_dds_reply_to_zenoh(
sample,
zenoh_key_expr.clone(),
&mut zwrite!(queries_in_progress),
Expand Down Expand Up @@ -236,7 +236,7 @@ impl RouteServiceSrv<'_> {
.zsession
.declare_queryable(&self.zenoh_key_expr)
.callback(move |query| {
do_route_request(
route_zenoh_request_to_dds(
query,
&mut zwrite!(queries_in_progress),
&sequence_number,
Expand Down Expand Up @@ -341,7 +341,7 @@ impl RouteServiceSrv<'_> {

const CDR_HEADER_LE: [u8; 4] = [0, 1, 0, 0];

fn do_route_request(
fn route_zenoh_request_to_dds(
query: Query,
queries_in_progress: &mut HashMap<u64, Query>,
sequence_number: &AtomicU64,
Expand Down Expand Up @@ -389,22 +389,24 @@ fn do_route_request(
};

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing request #{n} to Service - payload: {dds_req_buf:02x?}");
log::debug!(
"{route_id}: routing request #{n} from Zenoh to DDS - payload: {dds_req_buf:02x?}"
);
} else {
log::trace!(
"{route_id}: routing request #{n} to Service - {} bytes",
"{route_id}: routing request #{n} from Zenoh to DDS - {} bytes",
dds_req_buf.len()
);
}

queries_in_progress.insert(n, query);
if let Err(e) = dds_write(req_writer, dds_req_buf) {
log::warn!("{route_id}: routing request failed: {e}");
log::warn!("{route_id}: routing request from Zenoh to DDS failed: {e}");
queries_in_progress.remove(&n);
}
}

fn do_route_reply(
fn route_dds_reply_to_zenoh(
sample: &DDSRawSample,
zenoh_key_expr: OwnedKeyExpr,
queries_in_progress: &mut HashMap<u64, Query>,
Expand All @@ -415,7 +417,7 @@ fn do_route_reply(
// the client guid (8 bytes) and a sequence_number (8 bytes). As per rmw_cyclonedds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
if sample.len() < 20 {
log::warn!("{route_id}: received invalid response: {sample:0x?}");
log::warn!("{route_id}: received invalid response from DDS: {sample:0x?}");
return;
}

Expand Down Expand Up @@ -452,10 +454,10 @@ fn do_route_reply(
zenoh_rep_buf.push_zslice(slice.subslice(20, slice.len()).unwrap());

if *LOG_PAYLOAD {
log::debug!("{route_id}: routing reply #{seq_num} to Client - payload: {zenoh_rep_buf:02x?}");
log::debug!("{route_id}: routing reply #{seq_num} from DDS to Zenoh - payload: {zenoh_rep_buf:02x?}");
} else {
log::trace!(
"{route_id}: routing reply #{seq_num} to Client - {} bytes",
"{route_id}: routing reply #{seq_num} from DDS to Zenoh - {} bytes",
zenoh_rep_buf.len()
);
}
Expand All @@ -464,11 +466,11 @@ fn do_route_reply(
.reply(Ok(Sample::new(zenoh_key_expr, zenoh_rep_buf)))
.res_sync()
{
log::warn!("{route_id}: routing reply for request #{seq_num} failed: {e}");
log::warn!("{route_id}: routing reply for request #{seq_num} from DDS to Zenoh failed: {e}");
}
}
None => log::warn!(
"{route_id}: received response an unknown query (already dropped?): #{seq_num}"
"{route_id}: received response from DDS an unknown query (already timed out ?): #{seq_num}"
),
}
}

0 comments on commit 6641c70

Please sign in to comment.