Skip to content

Commit

Permalink
Fix matching_status_* failures
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Jan 25, 2024
1 parent e57c2c2 commit 0d9390f
Showing 1 changed file with 65 additions and 56 deletions.
121 changes: 65 additions & 56 deletions zenoh/tests/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,195 +11,204 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::str::FromStr;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh_core::ztimeout;
use zenoh_result::ZResult as Result;

const TIMEOUT: Duration = Duration::from_secs(60);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);

#[cfg(feature = "unstable")]
// #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_matching_status_any() {
use flume::RecvTimeoutError;
async fn create_session_pair(locator: &str) -> Result<(Session, Session)> {
let config1 = {
let mut config = zenoh::config::peer();
config.scouting.multicast.set_enabled(Some(false)).unwrap();
config
.listen
.set_endpoints(vec![locator.clone().parse().unwrap()])
.unwrap();
config
};
let config2 = zenoh::config::client([Locator::from_str(&locator)?]);

let session1 = ztimeout!(zenoh::open(config1).res_async())?;
let session2 = ztimeout!(zenoh::open(config2).res_async())?;
Ok((session1, session2))
}

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_matching_status_any() -> Result<()> {
use flume::RecvTimeoutError;

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_any_test")
.allowed_destination(Locality::Any)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_any_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_any_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

Ok(())
}

#[cfg(feature = "unstable")]
// #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_matching_status_remote() {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_matching_status_remote() -> Result<()> {
use flume::RecvTimeoutError;

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let (session1, session2) = create_session_pair("tcp/127.0.0.1:18002").await?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_remote_test")
.allowed_destination(Locality::Remote)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_remote_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_remote_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());
Ok(())
}

#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_matching_status_local() {
async fn zenoh_matching_status_local() -> Result<()> {
use flume::RecvTimeoutError;

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let (session1, session2) = create_session_pair("tcp/127.0.0.1:18003").await?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_local_test")
.allowed_destination(Locality::SessionLocal)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_local_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_local_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());
Ok(())
}

0 comments on commit 0d9390f

Please sign in to comment.