Skip to content

Commit

Permalink
Merge pull request #213 from RosLibRust/support-persistent-flag-ros1-…
Browse files Browse the repository at this point in the history
…services

ROS1 Services support persistent
  • Loading branch information
Carter12s authored Dec 3, 2024
2 parents 63a282b + b15b895 commit 8a5ecbb
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Dropping the last ros1::NodeHandle results in the node cleaning up any advertises, subscriptions, and services with the ROS master.
- Generated code now includes various lint attributes to suppress warnings.
- TCPROS header parsing now ignores (the undocumented fields) response_type and request_type and doesn't produce warnings on them.
- ROS1 native service servers now respect the "persistent" header field, and automatically close the underlying TCP socket after a single request unless persistent is set to 1 in the connection header.

### Changed

Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Publication {
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
persistent: None,
};
trace!("Publisher connection header: {responding_conn_header:?}");

Expand Down
2 changes: 2 additions & 0 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ impl ServiceClientLink {
service: Some(service_name.to_owned()),
topic_type: service_type.to_owned(),
tcp_nodelay: false,
// We do want a persistent connection to our service clients
persistent: Some(true),
};

let (call_tx, call_rx) = mpsc::unbounded_channel::<CallServiceRequest>();
Expand Down
13 changes: 10 additions & 3 deletions roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl ServiceServerLink {
topic: None,
topic_type: service_type.to_string(),
tcp_nodelay: false,
persistent: None,
};
let bytes = response_header.to_bytes(false).unwrap();
if let Err(e) = stream.write_all(&bytes).await {
Expand All @@ -188,9 +189,6 @@ impl ServiceServerLink {
return;
}

// TODO we're not currently reading the persistent flag out of the connection header and treating
// all connections as persistent
// That means we expect one header exchange, and then multiple body exchanges
// Each loop is one body:
loop {
let full_body = match tcpros::receive_body(&mut stream).await {
Expand Down Expand Up @@ -226,6 +224,15 @@ impl ServiceServerLink {
stream.write_all(&full_response).await.unwrap();
}
}

// If a persistent service connection was requested keep requesting bodies
if let Some(true) = connection_header.persistent {
continue;
} else {
// This will result in the task shutting down, dropping the TCP socket and clean shutdown
debug!("Service request connection for {service_name} is not persistent, shutting down");
break;
}
}
}
}
1 change: 1 addition & 0 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl Subscription {
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
persistent: None,
};

Self {
Expand Down
16 changes: 14 additions & 2 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub struct ConnectionHeader {
pub topic: Option<String>,
pub topic_type: String,
pub tcp_nodelay: bool, // TODO this field should be optional and None for service clients and servers
// TODO service client may include "persistent" here
// TODO service server only has to respond with caller_id (all other fields optional)
pub persistent: Option<bool>,
// TODO service server only has to respond with caller_id (all other fields optional)
}

impl ConnectionHeader {
Expand All @@ -40,6 +40,7 @@ impl ConnectionHeader {
let mut service = None;
let mut topic_type = String::new();
let mut tcp_nodelay = false;
let mut persistent = None;

// TODO: Unhandled: error, persistent
while cursor.position() < header_data.len() as u64 {
Expand Down Expand Up @@ -80,6 +81,10 @@ impl ConnectionHeader {
let mut tcp_nodelay_str = String::new();
field[equals_pos + 1..].clone_into(&mut tcp_nodelay_str);
tcp_nodelay = &tcp_nodelay_str != "0";
} else if field.starts_with("persistent=") {
let mut persistent_str = String::new();
field[equals_pos + 1..].clone_into(&mut persistent_str);
persistent = Some(&persistent_str != "0");
} else if field.starts_with("probe=") {
// probe is apprantly an undocumented header field that is sent
// by certain ros tools when they initiate a service_client connection to a service server
Expand All @@ -106,6 +111,7 @@ impl ConnectionHeader {
service,
topic_type,
tcp_nodelay,
persistent,
};
trace!(
"Got connection header: {header:?} for topic {:?}",
Expand Down Expand Up @@ -162,6 +168,12 @@ impl ConnectionHeader {
header_data.write_u32::<LittleEndian>(topic_type.len() as u32)?;
header_data.write(topic_type.as_bytes())?;

if let Some(persistent) = self.persistent {
let persistent = format!("persistent={}", if persistent { 1 } else { 0 });
header_data.write_u32::<LittleEndian>(persistent.len() as u32)?;
header_data.write(persistent.as_bytes())?;
}

// Now that we know the length, stick its value in the first 4 bytes
let total_length = (header_data.len() - 4) as u32;
for (idx, byte) in total_length.to_le_bytes().iter().enumerate() {
Expand Down

0 comments on commit 8a5ecbb

Please sign in to comment.