From eff15daa48682faaa908604c227fdee85eb150b5 Mon Sep 17 00:00:00 2001 From: Carter Date: Fri, 28 Jun 2024 20:53:57 -0600 Subject: [PATCH 1/3] Various service server cleanups --- roslibrust/examples/ros1_service_server.rs | 80 +++++++ roslibrust/src/ros1/service_client.rs | 40 ++++ roslibrust/src/ros1/service_server.rs | 233 ++++++++++++--------- 3 files changed, 252 insertions(+), 101 deletions(-) create mode 100644 roslibrust/examples/ros1_service_server.rs diff --git a/roslibrust/examples/ros1_service_server.rs b/roslibrust/examples/ros1_service_server.rs new file mode 100644 index 00000000..3db5dbe4 --- /dev/null +++ b/roslibrust/examples/ros1_service_server.rs @@ -0,0 +1,80 @@ +roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces"); + +#[cfg(feature = "ros1")] +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + use log::*; + use roslibrust::ros1::NodeHandle; + use std::sync::{Arc, Mutex}; + + simple_logger::SimpleLogger::new() + .with_level(log::LevelFilter::Debug) + .without_timestamps() + .init() + .unwrap(); + + let nh = NodeHandle::new("http://localhost:11311", "service_server_rs").await?; + log::info!("Connected!"); + + // Because our service server can run from any thread at any time + // and *multiple service requests can process in parallel* + // the service function is not allowed to hold any references to external data + // all data must be owned by the service function + // See: https://tokio.rs/tokio/tutorial/spawning + // To work around this we want to move a "protected" version of our state into the function + let bool_state = Arc::new(Mutex::new(false)); + + let bool_state_copy = bool_state.clone(); // Make a copy (of the Arc, not the contents!) + let server_fn = move |request: std_srvs::SetBoolRequest| { + log::info!("Got request to set bool: {request:?}"); + // Actually set the bool + *bool_state_copy.lock().unwrap() = request.data; + Ok(std_srvs::SetBoolResponse { + success: true, + message: "You set my bool!".to_string(), + }) + }; + + // Start our service running! + let _handle = nh + .advertise_service::("~/my_set_bool", server_fn) + .await?; + info!("Service has started"); + + // Setup a task to kill this process when ctrl_c comes in: + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + std::process::exit(0); + }); + + // As long as _handle is kept alive our service will continue to run + + // For funsies we can also spawn a task to periodically call our service + // TODO something broken with naming here + let service_client = nh + .service_client::("~my_set_bool") + .await?; + tokio::spawn(async move { + let mut bool = false; + loop { + bool = !bool; + service_client + .call(&std_srvs::SetBoolRequest { data: bool }) + .await + .unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + }); + + // We can still access our shared state, we just have to do it safely + loop { + let cur_bool = *bool_state.lock().unwrap(); + info!("Current value of our bool: {cur_bool}"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } +} + +#[cfg(not(feature = "ros1"))] +fn main() { + eprintln!("This example does nothing without compiling with the feature 'ros1'"); +} diff --git a/roslibrust/src/ros1/service_client.rs b/roslibrust/src/ros1/service_client.rs index b06ac9bb..5fd01ccf 100644 --- a/roslibrust/src/ros1/service_client.rs +++ b/roslibrust/src/ros1/service_client.rs @@ -293,4 +293,44 @@ mod test { panic!("Unexpected error type"); } } + + #[test_log::test(tokio::test)] + async fn persistent_client_can_be_called_multiple_times() { + let nh = NodeHandle::new( + "http://localhost:11311", + "/persistent_client_can_be_called_multiple_times", + ) + .await + .unwrap(); + + let server_fn = |request: test_msgs::AddTwoIntsRequest| { + Ok(test_msgs::AddTwoIntsResponse { + sum: request.a + request.b, + }) + }; + + let _handle = nh + .advertise_service::( + "/persistent_client_can_be_called_multiple_times/add_two", + server_fn, + ) + .await + .unwrap(); + + let client = nh + .service_client::( + "/persistent_client_can_be_called_multiple_times/add_two", + ) + .await + .unwrap(); + + for i in 0..10 { + let call: test_msgs::AddTwoIntsResponse = client + .call(&test_msgs::AddTwoIntsRequest { a: 1, b: i }) + .await + .unwrap(); + + assert_eq!(call.sum, 1 + i); + } + } } diff --git a/roslibrust/src/ros1/service_server.rs b/roslibrust/src/ros1/service_server.rs index 064a5a17..ec0c2597 100644 --- a/roslibrust/src/ros1/service_server.rs +++ b/roslibrust/src/ros1/service_server.rs @@ -1,4 +1,7 @@ -use std::net::{Ipv4Addr, SocketAddr}; +use std::{ + net::{Ipv4Addr, SocketAddr}, + sync::Arc, +}; use abort_on_drop::ChildTask; use log::*; @@ -13,6 +16,11 @@ use super::{names::Name, NodeHandle}; // https://doc.rust-lang.org/beta/unstable-book/language-features/trait-alias.html // trait ServerFunction = Fn(T::Request) -> Err(T::Response, Box) + Send + Sync + 'static; +type TypeErasedCallback = dyn Fn(Vec) -> Result, Box> + + Send + + Sync + + 'static; + /// ServiceServer is simply a lifetime control /// The underlying ServiceServer is kept alive while object is kept alive. /// Dropping this object, un-advertises the underlying service with rosmaster @@ -52,12 +60,7 @@ pub(crate) struct ServiceServerLink { impl ServiceServerLink { pub(crate) async fn new( - method: Box< - dyn Fn(Vec) -> Result, Box> - + Send - + Sync - + 'static, - >, + method: Box, host_addr: Ipv4Addr, service_name: Name, node_name: Name, @@ -81,6 +84,8 @@ impl ServiceServerLink { }) } + /// Need to provide the port the server is listening on to the rest of the application + /// so we can inform rosmaster of the full URI of where this service is located pub(crate) fn port(&self) -> u16 { self.port } @@ -91,111 +96,137 @@ impl ServiceServerLink { listener: tokio::net::TcpListener, service_name: Name, // Service path of the this service node_name: Name, // Name of node we're running on - method: Box< - dyn Fn(Vec) -> Result, Box> - + Send - + Sync - + 'static, - >, + method: Box, ) { + // We have to move our callback into an Arc so the separately spawned tasks for each service connection + // can access it in parrallel and not worry about the lifetime. + // TODO: it may be better to Arc it upfront? + let arc_method = Arc::new(method); loop { // Accept new TCP connections match listener.accept().await { - Ok((mut stream, peer_addr)) => { - // TODO for a bunch of the error branches in this handling - // it is unclear whether we should respond over the socket - // with an error or not? - // Probably it is better to try to send an error back? - debug!( - "Received service_request connection from {peer_addr} for {service_name}" - ); - - // Get the header from the stream: - let mut header_len_bytes = [0u8; 4]; - if let Err(e) = stream.read_exact(&mut header_len_bytes).await { - warn!("Communication error while handling service request connection for {service_name}, could not get header length: {e:?}"); - continue; - } - let header_len = u32::from_le_bytes(header_len_bytes) as usize; - - let mut connection_header = vec![0u8; header_len]; - if let Err(e) = stream.read_exact(&mut connection_header).await { - warn!("Communication error while handling service request connection for {service_name}, could not get header body: {e:?}"); - continue; - } - let connection_header = match ConnectionHeader::from_bytes(&connection_header) { - Ok(header) => header, - Err(e) => { - warn!("Communication error while handling service request connection for {service_name}, could not parse header: {e:?}"); - continue; - } - }; - trace!( - "Got connection header: {connection_header:#?} for service {service_name}" - ); - - // Respond with our header - // TODO this is pretty cursed, may want a better version - let response_header = ConnectionHeader { - caller_id: node_name.to_string(), - latching: false, - msg_definition: "".to_string(), - md5sum: "".to_string(), - service: None, - topic: None, - topic_type: "".to_string(), - tcp_nodelay: false, - }; - let bytes = response_header.to_bytes(false).unwrap(); - if let Err(e) = stream.write_all(&bytes).await { - warn!("Communication error while handling service request connection for {service_name}, could not write response header: {e:?}"); - continue; - } - - let mut body_len_bytes = [0u8; 4]; - if let Err(e) = stream.read_exact(&mut body_len_bytes).await { - warn!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}"); - continue; - } - let body_len = u32::from_le_bytes(body_len_bytes) as usize; - trace!("Got body length {body_len} for service {service_name}"); - - let mut body = vec![0u8; body_len]; - if let Err(e) = stream.read_exact(&mut body).await { - warn!("Communication error while handling service request connection for {service_name}, could not get body: {e:?}"); - continue; - } - trace!("Got body for service {service_name}: {body:#?}"); - - // Okay this is funky and I should be able to do better here - // serde_rosmsg expects the length at the front - let full_body = [body_len_bytes.to_vec(), body].concat(); - - let response = (method)(full_body); - - match response { - Ok(response) => { - // MAJOR TODO: handle error here - - // Another funky thing here - // services have to respond with one extra byte at the front - // to indicate success - let full_response = [vec![1u8], response].concat(); - - stream.write_all(&full_response).await.unwrap(); - } - Err(e) => { - warn!("Error from user service method for {service_name}: {e:?}"); - // MAJOR TODO: respond with error - } - } + Ok((stream, peer_addr)) => { + tokio::spawn(Self::handle_tcp_connection( + stream, + peer_addr, + service_name.clone(), + node_name.clone(), + arc_method.clone(), + )); } Err(e) => { + // Not entirely sure what circumstances can cause this? + // Loss of networking functionality on the host? warn!("Error accepting TCP connection for service {service_name}: {e:?}"); } }; } } + + /// Each TCP connection made to the service server is processed in a separate task + /// This function handles a single TCP connection + async fn handle_tcp_connection( + mut stream: tokio::net::TcpStream, + peer_addr: SocketAddr, + service_name: Name, + node_name: Name, + method: Arc>, + ) { + // TODO for a bunch of the error branches in this handling + // it is unclear whether we should respond over the socket + // with an error or not? + // Probably it is better to try to send an error back? + debug!("Received service_request connection from {peer_addr} for {service_name}"); + + // Get the header from the stream: + let mut header_len_bytes = [0u8; 4]; + if let Err(e) = stream.read_exact(&mut header_len_bytes).await { + warn!("Communication error while handling service request connection for {service_name}, could not get header length: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + let header_len = u32::from_le_bytes(header_len_bytes) as usize; + + let mut connection_header = vec![0u8; header_len]; + if let Err(e) = stream.read_exact(&mut connection_header).await { + warn!("Communication error while handling service request connection for {service_name}, could not get header body: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + let connection_header = match ConnectionHeader::from_bytes(&connection_header) { + Ok(header) => header, + Err(e) => { + warn!("Communication error while handling service request connection for {service_name}, could not parse header: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + }; + trace!("Got connection header: {connection_header:#?} for service {service_name}"); + + // Respond with our header + // TODO this is pretty cursed, may want a better version + let response_header = ConnectionHeader { + caller_id: node_name.to_string(), + latching: false, + msg_definition: "".to_string(), + md5sum: "".to_string(), + service: None, + topic: None, + topic_type: "".to_string(), + tcp_nodelay: false, + }; + let bytes = response_header.to_bytes(false).unwrap(); + if let Err(e) = stream.write_all(&bytes).await { + warn!("Communication error while handling service request connection for {service_name}, could not write response header: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + + // TODO we're not currently reading the persistent flag out of the connection header and treating + // all connections as persistent + // TODO: NEED TO VERIFY THIS WITH ROSPY / ROSCPP + // That means we expect one header exchange, and then multiple body exchanges + + let mut body_len_bytes = [0u8; 4]; + if let Err(e) = stream.read_exact(&mut body_len_bytes).await { + warn!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + let body_len = u32::from_le_bytes(body_len_bytes) as usize; + trace!("Got body length {body_len} for service {service_name}"); + + let mut body = vec![0u8; body_len]; + if let Err(e) = stream.read_exact(&mut body).await { + warn!("Communication error while handling service request connection for {service_name}, could not get body: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + trace!("Got body for service {service_name}: {body:#?}"); + + // Okay this is funky and I should be able to do better here + // serde_rosmsg expects the length at the front + let full_body = [body_len_bytes.to_vec(), body].concat(); + + let response = (method)(full_body); + + match response { + Ok(response) => { + // MAJOR TODO: handle error here + + // Another funky thing here + // services have to respond with one extra byte at the front + // to indicate success + let full_response = [vec![1u8], response].concat(); + + stream.write_all(&full_response).await.unwrap(); + } + Err(e) => { + warn!("Error from user service method for {service_name}: {e:?}"); + // MAJOR TODO: respond with error + } + } + } } #[cfg(feature = "ros1_test")] From f55d6b91bce163f5140be5d87aef4f6096127a51 Mon Sep 17 00:00:00 2001 From: Carter Date: Fri, 28 Jun 2024 21:35:32 -0600 Subject: [PATCH 2/3] service server can handle multiple queries in one TCP stream --- roslibrust/examples/ros1_service_client.rs | 10 +++- roslibrust/examples/ros1_service_server.rs | 3 +- roslibrust/src/ros1/service_server.rs | 67 +++++++++++----------- 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/roslibrust/examples/ros1_service_client.rs b/roslibrust/examples/ros1_service_client.rs index 6c058209..27074fbe 100644 --- a/roslibrust/examples/ros1_service_client.rs +++ b/roslibrust/examples/ros1_service_client.rs @@ -14,14 +14,18 @@ async fn main() -> Result<(), anyhow::Error> { let nh = NodeHandle::new("http://localhost:11311", "service_client_rs").await?; log::info!("Connected!"); - let response: rosapi::GetTimeResponse = nh + let client = nh .service_client::("rosapi/get_time") - .await? - .call(&rosapi::GetTimeRequest {}) .await?; + let response = client.call(&rosapi::GetTimeRequest {}).await?; + log::info!("Got time: {:?}", response); + let response2 = client.call(&rosapi::GetTimeRequest {}).await?; + + log::info!("Got time: {:?}", response2); + Ok(()) } diff --git a/roslibrust/examples/ros1_service_server.rs b/roslibrust/examples/ros1_service_server.rs index 3db5dbe4..016f826f 100644 --- a/roslibrust/examples/ros1_service_server.rs +++ b/roslibrust/examples/ros1_service_server.rs @@ -50,9 +50,8 @@ async fn main() -> Result<(), anyhow::Error> { // As long as _handle is kept alive our service will continue to run // For funsies we can also spawn a task to periodically call our service - // TODO something broken with naming here let service_client = nh - .service_client::("~my_set_bool") + .service_client::("~/my_set_bool") .await?; tokio::spawn(async move { let mut bool = false; diff --git a/roslibrust/src/ros1/service_server.rs b/roslibrust/src/ros1/service_server.rs index ec0c2597..cb4b58c5 100644 --- a/roslibrust/src/ros1/service_server.rs +++ b/roslibrust/src/ros1/service_server.rs @@ -184,46 +184,47 @@ impl ServiceServerLink { // TODO we're not currently reading the persistent flag out of the connection header and treating // all connections as persistent - // TODO: NEED TO VERIFY THIS WITH ROSPY / ROSCPP // That means we expect one header exchange, and then multiple body exchanges + // Each loop is one body: + loop { + let mut body_len_bytes = [0u8; 4]; + if let Err(e) = stream.read_exact(&mut body_len_bytes).await { + warn!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + let body_len = u32::from_le_bytes(body_len_bytes) as usize; + trace!("Got body length {body_len} for service {service_name}"); - let mut body_len_bytes = [0u8; 4]; - if let Err(e) = stream.read_exact(&mut body_len_bytes).await { - warn!("Communication error while handling service request connection for {service_name}, could not get body length: {e:?}"); - // TODO returning here simply closes the socket? Should we respond with an error instead? - return; - } - let body_len = u32::from_le_bytes(body_len_bytes) as usize; - trace!("Got body length {body_len} for service {service_name}"); - - let mut body = vec![0u8; body_len]; - if let Err(e) = stream.read_exact(&mut body).await { - warn!("Communication error while handling service request connection for {service_name}, could not get body: {e:?}"); - // TODO returning here simply closes the socket? Should we respond with an error instead? - return; - } - trace!("Got body for service {service_name}: {body:#?}"); + let mut body = vec![0u8; body_len]; + if let Err(e) = stream.read_exact(&mut body).await { + warn!("Communication error while handling service request connection for {service_name}, could not get body: {e:?}"); + // TODO returning here simply closes the socket? Should we respond with an error instead? + return; + } + trace!("Got body for service {service_name}: {body:#?}"); - // Okay this is funky and I should be able to do better here - // serde_rosmsg expects the length at the front - let full_body = [body_len_bytes.to_vec(), body].concat(); + // Okay this is funky and I should be able to do better here + // serde_rosmsg expects the length at the front + let full_body = [body_len_bytes.to_vec(), body].concat(); - let response = (method)(full_body); + let response = (method)(full_body); - match response { - Ok(response) => { - // MAJOR TODO: handle error here + match response { + Ok(response) => { + // MAJOR TODO: handle error here - // Another funky thing here - // services have to respond with one extra byte at the front - // to indicate success - let full_response = [vec![1u8], response].concat(); + // Another funky thing here + // services have to respond with one extra byte at the front + // to indicate success + let full_response = [vec![1u8], response].concat(); - stream.write_all(&full_response).await.unwrap(); - } - Err(e) => { - warn!("Error from user service method for {service_name}: {e:?}"); - // MAJOR TODO: respond with error + stream.write_all(&full_response).await.unwrap(); + } + Err(e) => { + warn!("Error from user service method for {service_name}: {e:?}"); + // MAJOR TODO: respond with error + } } } } From 292f5cf4a63661ac13baeefca391e12cd19a75c1 Mon Sep 17 00:00:00 2001 From: Carter Date: Fri, 28 Jun 2024 21:37:20 -0600 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47d65fa0..3b1e76bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- ROS1 native service servers and service clients are now supported (experimental feature) ### Fixed