Skip to content

Commit

Permalink
Fix matching_statu_* test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Jan 25, 2024
1 parent 8cd786f commit cff5384
Showing 1 changed file with 69 additions and 55 deletions.
124 changes: 69 additions & 55 deletions zenoh/tests/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
//
use async_std::prelude::FutureExt;
use async_std::task;
use std::str::FromStr;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh_core::zasync_executor_init;
use zenoh_result::ZResult as Result;

const TIMEOUT: Duration = Duration::from_secs(60);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
Expand All @@ -26,200 +28,212 @@ macro_rules! ztimeout {
};
}

#[cfg(feature = "unstable")]
async fn create_session_pair(locator: &str) -> Result<(Session, Session)> {
let config1 = {
let mut config = zenoh::config::peer();
config.scouting.multicast.set_enabled(Some(false)).unwrap();
config
.listen
.set_endpoints(vec![locator.clone().parse().unwrap()])
.unwrap();
config
};
let config2 = zenoh::config::client([Locator::from_str(&locator)?]);

let session1 = ztimeout!(zenoh::open(config1).res_async())?;
let session2 = ztimeout!(zenoh::open(config2).res_async())?;
Ok((session1, session2))
}

#[cfg(feature = "unstable")]
#[test]
fn zenoh_matching_status_any() {
fn zenoh_matching_status_any() -> Result<()> {
use flume::RecvTimeoutError;

task::block_on(async {
zasync_executor_init!();

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_any_test")
.allowed_destination(Locality::Any)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_any_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_any_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());
});
Ok(())
})
}

#[cfg(feature = "unstable")]
#[test]
fn zenoh_matching_status_remote() {
fn zenoh_matching_status_remote() -> Result<()> {
use flume::RecvTimeoutError;

task::block_on(async {
zasync_executor_init!();

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let session1 = ztimeout!(zenoh::open(config::peer()).res_async())?;

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let session2 = ztimeout!(zenoh::open(config::peer()).res_async())?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_remote_test")
.allowed_destination(Locality::Remote)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_remote_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_remote_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());
});

Ok(())
})
}

#[cfg(feature = "unstable")]
#[test]
fn zenoh_matching_status_local() {
fn zenoh_matching_status_local() -> Result<()> {
use flume::RecvTimeoutError;

task::block_on(async {
zasync_executor_init!();

let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let session1 = ztimeout!(zenoh::open(config::peer()).res_async())?;

let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap();
let session2 = ztimeout!(zenoh::open(config::peer()).res_async())?;

let publisher1 = ztimeout!(session1
.declare_publisher("zenoh_matching_status_local_test")
.allowed_destination(Locality::SessionLocal)
.res_async())
.unwrap();
.res_async())?;

let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap();
let matching_listener = ztimeout!(publisher1.matching_listener().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1
.declare_subscriber("zenoh_matching_status_local_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2
.declare_subscriber("zenoh_matching_status_local_test")
.res_async())
.unwrap();
.res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare().res_async()).unwrap();
ztimeout!(sub.undeclare().res_async())?;

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));

let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap();
let matching_status = ztimeout!(publisher1.matching_status().res_async())?;
assert!(!matching_status.matching_subscribers());
});

Ok(())
})
}

0 comments on commit cff5384

Please sign in to comment.