Skip to content

Commit

Permalink
Fix failing Liveliness Subscriber Undeclaration (#1283)
Browse files Browse the repository at this point in the history
* Undeclare subscribers at the end of liveliness tests

* Use `kind` in `Seesion::undeclare_subscriber_inner`

* Address review comments
  • Loading branch information
fuzzypixelz authored Jul 31, 2024
1 parent 5cfcccc commit bcda8ec
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 22 deletions.
48 changes: 38 additions & 10 deletions zenoh-ext/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_liveliness_querying_subscriber_clique() {
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
Expand All @@ -81,12 +81,18 @@ async fn test_liveliness_querying_subscriber_clique() {
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
token1.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

token2.undeclare().await.unwrap();
sub.close().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand All @@ -106,7 +112,7 @@ async fn test_liveliness_querying_subscriber_brokered() {

zenoh_util::try_init_log_from_env();

let _router = {
let router = {
let mut c = config::default();
c.listen
.endpoints
Expand Down Expand Up @@ -168,7 +174,7 @@ async fn test_liveliness_querying_subscriber_brokered() {
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
Expand All @@ -179,12 +185,20 @@ async fn test_liveliness_querying_subscriber_brokered() {
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
token1.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

token2.undeclare().await.unwrap();
sub.close().await.unwrap();

router.close().await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down Expand Up @@ -244,7 +258,7 @@ async fn test_liveliness_fetching_subscriber_clique() {
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
Expand All @@ -255,12 +269,18 @@ async fn test_liveliness_fetching_subscriber_clique() {
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
token1.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

token2.undeclare().await.unwrap();
sub.close().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand All @@ -280,7 +300,7 @@ async fn test_liveliness_fetching_subscriber_brokered() {

zenoh_util::try_init_log_from_env();

let _router = {
let router = {
let mut c = config::default();
c.listen
.endpoints
Expand Down Expand Up @@ -346,7 +366,7 @@ async fn test_liveliness_fetching_subscriber_brokered() {
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
Expand All @@ -357,10 +377,18 @@ async fn test_liveliness_fetching_subscriber_brokered() {
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
token1.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

token2.undeclare().await.unwrap();
sub.close().await.unwrap();

router.close().await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
client3.close().await.unwrap();
}
5 changes: 1 addition & 4 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,10 +1238,7 @@ impl Session {

pub(crate) fn undeclare_subscriber_inner(&self, sid: Id, kind: SubscriberKind) -> ZResult<()> {
let mut state = zwrite!(self.state);
if let Some(sub_state) = state
.subscribers_mut(SubscriberKind::Subscriber)
.remove(&sid)
{
if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) {
trace!("undeclare_subscriber({:?})", sub_state);
for res in state
.local_resources
Expand Down
44 changes: 36 additions & 8 deletions zenoh/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ async fn test_liveliness_subscriber_clique() {
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

drop(token);
token.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Delete);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

sub.undeclare().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}

#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -114,7 +119,7 @@ async fn test_liveliness_query_clique() {
s
};

let _token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
let token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let get = ztimeout!(peer2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap();
Expand All @@ -123,6 +128,11 @@ async fn test_liveliness_query_clique() {
let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

token.undeclare().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}

#[cfg(feature = "unstable")]
Expand All @@ -141,7 +151,7 @@ async fn test_liveliness_subscriber_brokered() {

zenoh_util::try_init_log_from_env();

let _router = {
let router = {
let mut c = config::default();
c.listen
.endpoints
Expand Down Expand Up @@ -190,12 +200,18 @@ async fn test_liveliness_subscriber_brokered() {
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

drop(token);
token.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Delete);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

sub.undeclare().await.unwrap();

router.close().await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}

#[cfg(feature = "unstable")]
Expand All @@ -213,7 +229,7 @@ async fn test_liveliness_query_brokered() {

zenoh_util::try_init_log_from_env();

let _router = {
let router = {
let mut c = config::default();
c.listen
.endpoints
Expand Down Expand Up @@ -252,7 +268,7 @@ async fn test_liveliness_query_brokered() {
s
};

let _token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
let token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let get = ztimeout!(client2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap();
Expand All @@ -261,6 +277,12 @@ async fn test_liveliness_query_brokered() {
let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

token.undeclare().await.unwrap();

router.close().await.unwrap();
client1.close().await.unwrap();
client2.close().await.unwrap();
}

#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -295,12 +317,15 @@ async fn test_liveliness_subscriber_local() {
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

drop(token);
token.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Delete);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

sub.undeclare().await.unwrap();
peer.close().await.unwrap();
}

#[cfg(feature = "unstable")]
Expand All @@ -325,7 +350,7 @@ async fn test_liveliness_query_local() {
s
};

let _token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
let token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let get = ztimeout!(peer.liveliness().get(LIVELINESS_KEYEXPR)).unwrap();
Expand All @@ -334,4 +359,7 @@ async fn test_liveliness_query_local() {
let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

token.undeclare().await.unwrap();
peer.close().await.unwrap();
}

0 comments on commit bcda8ec

Please sign in to comment.