From cff5384aa92ab6130479b89a6a0971b44161216e Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Thu, 25 Jan 2024 12:17:52 +0800 Subject: [PATCH] Fix matching_statu_* test failures --- zenoh/tests/matching.rs | 124 ++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 55 deletions(-) diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index cf637ee625..5aefc135a9 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -13,9 +13,11 @@ // use async_std::prelude::FutureExt; use async_std::task; +use std::str::FromStr; use std::time::Duration; use zenoh::prelude::r#async::*; use zenoh_core::zasync_executor_init; +use zenoh_result::ZResult as Result; const TIMEOUT: Duration = Duration::from_secs(60); const RECV_TIMEOUT: Duration = Duration::from_secs(1); @@ -26,200 +28,212 @@ macro_rules! ztimeout { }; } +#[cfg(feature = "unstable")] +async fn create_session_pair(locator: &str) -> Result<(Session, Session)> { + let config1 = { + let mut config = zenoh::config::peer(); + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + config + .listen + .set_endpoints(vec![locator.clone().parse().unwrap()]) + .unwrap(); + config + }; + let config2 = zenoh::config::client([Locator::from_str(&locator)?]); + + let session1 = ztimeout!(zenoh::open(config1).res_async())?; + let session2 = ztimeout!(zenoh::open(config2).res_async())?; + Ok((session1, session2)) +} + #[cfg(feature = "unstable")] #[test] -fn zenoh_matching_status_any() { +fn zenoh_matching_status_any() -> Result<()> { use flume::RecvTimeoutError; task::block_on(async { zasync_executor_init!(); - let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); - - let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await?; let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_any_test") .allowed_destination(Locality::Any) - .res_async()) - .unwrap(); + .res_async())?; - let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session1 .declare_subscriber("zenoh_matching_status_any_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session2 .declare_subscriber("zenoh_matching_status_any_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); - }); + Ok(()) + }) } #[cfg(feature = "unstable")] #[test] -fn zenoh_matching_status_remote() { +fn zenoh_matching_status_remote() -> Result<()> { use flume::RecvTimeoutError; task::block_on(async { zasync_executor_init!(); - let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session1 = ztimeout!(zenoh::open(config::peer()).res_async())?; - let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(zenoh::open(config::peer()).res_async())?; let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_remote_test") .allowed_destination(Locality::Remote) - .res_async()) - .unwrap(); + .res_async())?; - let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session1 .declare_subscriber("zenoh_matching_status_remote_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session2 .declare_subscriber("zenoh_matching_status_remote_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); - }); + + Ok(()) + }) } #[cfg(feature = "unstable")] #[test] -fn zenoh_matching_status_local() { +fn zenoh_matching_status_local() -> Result<()> { use flume::RecvTimeoutError; task::block_on(async { zasync_executor_init!(); - let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session1 = ztimeout!(zenoh::open(config::peer()).res_async())?; - let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(zenoh::open(config::peer()).res_async())?; let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_local_test") .allowed_destination(Locality::SessionLocal) - .res_async()) - .unwrap(); + .res_async())?; - let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session1 .declare_subscriber("zenoh_matching_status_local_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); let sub = ztimeout!(session2 .declare_subscriber("zenoh_matching_status_local_test") - .res_async()) - .unwrap(); + .res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); - ztimeout!(sub.undeclare().res_async()).unwrap(); + ztimeout!(sub.undeclare().res_async())?; let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); - let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + let matching_status = ztimeout!(publisher1.matching_status().res_async())?; assert!(!matching_status.matching_subscribers()); - }); + + Ok(()) + }) }